iqrfpy.utils.sensor_parser

Sensor parser module.

Provides methods for parsing of sensor data collected by DPA and JSON API requests.

  1"""Sensor parser module.
  2
  3Provides methods for parsing of sensor data collected by DPA and JSON API requests.
  4"""
  5
  6import math
  7import struct
  8from typing import List, Union, Optional
  9from iqrfpy.exceptions import UnknownSensorTypeError
 10from iqrfpy.objects.sensor_data import SensorData
 11from iqrfpy.utils.sensor_constants import SensorDataSize, SensorTypes, SensorFrcCommands, SensorFrcErrors
 12from iqrfpy.utils.common import Common
 13from iqrfpy.utils.quantity_data import get_sensor_class
 14
 15
 16class SensorParser:
 17    """Class for parsing data from Sensor standard response data."""
 18
 19    @classmethod
 20    def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
 21        """Process data from Enumerate DPA response into a list of SensorData objects.
 22
 23        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
 24        produced by this method will not carry a value.
 25
 26        Args:
 27            dpa (List[int]): List of pdata bytes from DPA response
 28        Returns:
 29            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 30        Raises:
 31            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
 32        """
 33        sensor_data = []
 34        for sensor_type_value in dpa:
 35            if sensor_type_value not in SensorTypes:
 36                raise UnknownSensorTypeError('Unsupported sensor type.')
 37            sensor_type = SensorTypes(sensor_type_value)
 38            sensor_class = get_sensor_class(sensor_type)
 39            sensor_data.append(
 40                SensorData(
 41                    sensor_type=sensor_type,
 42                    index=len(sensor_data),
 43                    name=sensor_class.name,
 44                    short_name=sensor_class.short_name,
 45                    unit=sensor_class.unit,
 46                    decimal_places=sensor_class.decimal_places,
 47                    frc_commands=sensor_class.frc_commands
 48                )
 49            )
 50        return sensor_data
 51
 52    @classmethod
 53    def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]:
 54        """Process data from Enumerate API response into a list of SensorData objects.
 55
 56        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
 57        produced by this method will not carry a value.
 58
 59        Args:
 60            json_data (List[dict]): List of json objects from JSON API response
 61        Returns:
 62            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 63        Raises:
 64            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
 65        """
 66        sensor_data = []
 67        for data in json_data:
 68            sensor_type_value = data['type']
 69            if sensor_type_value not in SensorTypes:
 70                raise UnknownSensorTypeError('Unsupported sensor type.')
 71            sensor_type = SensorTypes(sensor_type_value)
 72            sensor_class = get_sensor_class(sensor_type)
 73            sensor_data.append(
 74                SensorData(
 75                    sensor_type=sensor_type,
 76                    index=len(sensor_data),
 77                    name=sensor_class.name,
 78                    short_name=sensor_class.short_name,
 79                    unit=sensor_class.unit,
 80                    decimal_places=sensor_class.decimal_places,
 81                    frc_commands=sensor_class.frc_commands
 82                )
 83            )
 84        return sensor_data
 85
 86    @classmethod
 87    def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]:
 88        """Process data from ReadSensor DPA response into a list of SensorData objects.
 89
 90        Because the ReadSensors DPA response does not carry information about sensor types,
 91        it is necessary to provide sensor types for response data.
 92
 93        Args:
 94            sensor_types (List[int]): List of sensor types
 95            dpa (List[int]): List of pdata bytes from DPA response
 96        Returns:
 97            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 98        Raises:
 99            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
100            ValueError: Raised if passed data is shorter than required to process all sensors
101        """
102        sensor_data = []
103        data_index = 0
104        sensor_index = 0
105        while data_index < len(dpa):
106            if sensor_index >= len(sensor_types):
107                raise ValueError('Too little sensor types provided for the amount of sensor data.')
108            sensor_type_value = sensor_types[sensor_index]
109            if sensor_type_value not in SensorTypes:
110                raise UnknownSensorTypeError('Unsupported sensor type.')
111            sensor_type = SensorTypes(sensor_type_value)
112            if sensor_type == SensorTypes.DATA_BLOCK:
113                data_len = dpa[data_index] + 1
114                if data_index + data_len - 1 >= len(dpa):
115                    raise ValueError('Data length longer than actual data.')
116            else:
117                data_len = _data_len_from_type(sensor_type)
118                if data_index + data_len > len(dpa):
119                    raise ValueError('Data length longer than actual data.')
120            sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len])
121            data_index += data_len
122            sensor_index += 1
123        return cls.read_sensors_with_types_from_dpa(sensor_data)
124
125    @classmethod
126    def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
127        """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects.
128
129        Args:
130            dpa (List[int]): List of pdata bytes from DPA response
131        Returns:
132            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
133        Raises:
134            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
135            ValueError: Raised if passed data is shorter than required to process all sensors
136        """
137        sensor_data = []
138        index = 0
139        while index < len(dpa):
140            sensor_type_value = dpa[index]
141            if sensor_type_value not in SensorTypes:
142                raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.')
143            sensor_type = SensorTypes(sensor_type_value)
144            if sensor_type == SensorTypes.DATA_BLOCK:
145                data_length = dpa[index + 1] + 1
146                if index + data_length >= len(dpa):
147                    raise ValueError('Data length is less than expected to process all sensors.')
148                data = dpa[index + 2:index + 2 + data_length - 1]
149            else:
150                data_length = _data_len_from_type(sensor_type)
151                if index + data_length >= len(dpa):
152                    raise ValueError('Data length is less than expected to process all sensors.')
153                data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length])
154            sensor_class = get_sensor_class(sensor_type)
155            sensor_data.append(
156                SensorData(
157                    sensor_type=sensor_type,
158                    index=len(sensor_data),
159                    name=sensor_class.name,
160                    short_name=sensor_class.short_name,
161                    unit=sensor_class.unit,
162                    decimal_places=sensor_class.decimal_places,
163                    frc_commands=sensor_class.frc_commands,
164                    value=(
165                        round(data, sensor_class.decimal_places)
166                        if data is not None and not isinstance(data, list)
167                        else data
168                    )
169                )
170            )
171            index += (data_length + 1)
172        return sensor_data
173
174    @classmethod
175    def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]:
176        """Process data from ReadSensorWithTypes API response into a list of SensorData objects.
177
178        Args:
179            json_data (List[dict]): List of json objects from JSON API response
180        Returns:
181            :obj:`list` of SensorData`: List of SensorData objects containing parsed data
182        Raises:
183            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
184        """
185        sensor_data = []
186        for data in json_data:
187            sensor_type_value = data['type']
188            if sensor_type_value not in SensorTypes:
189                raise UnknownSensorTypeError('Unsupported sensor type.')
190            sensor_type = SensorTypes(sensor_type_value)
191            sensor_class = get_sensor_class(sensor_type)
192            sensor_data.append(
193                SensorData(
194                    sensor_type=sensor_type,
195                    index=len(sensor_data),
196                    name=sensor_class.name,
197                    short_name=sensor_class.short_name,
198                    unit=sensor_class.unit,
199                    decimal_places=sensor_class.decimal_places,
200                    frc_commands=sensor_class.frc_commands,
201                    value=data['value']
202                )
203            )
204        return sensor_data
205
206    @classmethod
207    def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int,
208                frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None,
209                count: Optional[int] = None) -> List[SensorData]:
210        """Process data from DPA FRC response into a list of SensorData.
211
212        SensorData object contains information about the measured quantity and converted value.
213        The data argument expects only FRC data bytes, without the status byte.
214        The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response.
215
216        If count is specified, only that number of node data is processed.
217        For example, if total length of passed data is 64 bytes (which includes extra result data) and data length
218        per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be
219        processed and returned as SensorData objects.
220
221        If count is not specified and combined length of passed FRC and extra result data is not equal to the number
222        of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry,
223        a ValueError is raised.
224
225        Args:
226            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
227            sensor_index (int): Index of sensor
228            frc_command (int): FRC command used to collect data
229            data (List[int]): Data collected from Send or SendSelective message
230            extra_result (List[int]): Data collected from ExtraResult message
231            count (Union[int, None]): Specifies number of nodes to process
232        Returns:
233            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
234        Raises:
235            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
236            ValueError: Raised if combined length of frc data and extra result does not match the required data
237                        length to process nodes regardless of count argument value
238        """
239        if isinstance(sensor_type, int):
240            if sensor_type not in SensorTypes:
241                raise UnknownSensorTypeError('Unknown or unsupported sensor type.')
242            sensor_type = SensorTypes(sensor_type)
243        sensor_class = get_sensor_class(sensor_type)
244        dpa = data
245        if frc_command == SensorFrcCommands.FRC_1BYTE:
246            dpa = data[1:]
247        elif frc_command == SensorFrcCommands.FRC_2BYTES:
248            dpa = data[2:]
249        elif frc_command == SensorFrcCommands.FRC_4BYTES:
250            dpa = data[4:]
251        if extra_result is not None:
252            dpa.extend(extra_result)
253        data_len = _data_len_from_frc_command(frc_command=frc_command)
254        if count is None:
255            if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0:
256                raise ValueError('Invalid length of combined frc data and extra result data.')
257        else:
258            if frc_command != SensorFrcCommands.FRC_2BITS:
259                if len(dpa) < count * data_len:
260                    raise ValueError(f'Combined length of frc data and extra result is less than length of data'
261                                     f'required to process {count} devices.')
262                dpa = dpa[:count * data_len]
263        if data_len == 0.25:
264            itr = count + 1 if count is not None else 240
265            frc_values = []
266            for i in range(1, itr):
267                mask = 1 << (i % 8)
268                idx = math.floor(i / 8)
269                if idx + 32 >= len(dpa):
270                    raise ValueError('Combined length of frc data and extra result is too short.')
271                val = 0
272                if (dpa[idx] & mask) != 0:
273                    val = 1
274                if (dpa[idx + 32] & mask) != 0:
275                    val |= 2
276                frc_values.append(val)
277        elif data_len == 1:
278            frc_values = dpa
279        elif data_len == 2:
280            frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)]
281        else:
282            frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in
283                          range(0, len(dpa), 4)]
284        sensor_data = []
285        for frc_value in frc_values:
286            value = cls.frc_convert(sensor_type, frc_command, frc_value)
287            sensor_data.append(
288                SensorData(
289                    sensor_type=sensor_type,
290                    index=sensor_index,
291                    name=sensor_class.name,
292                    short_name=sensor_class.short_name,
293                    unit=sensor_class.unit,
294                    decimal_places=sensor_class.decimal_places,
295                    frc_commands=sensor_class.frc_commands,
296                    value=(
297                        round(value, sensor_class.decimal_places)
298                        if value is not None and not isinstance(value, SensorFrcErrors)
299                        else value
300                    )
301                )
302            )
303        return sensor_data
304
305    @staticmethod
306    def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]:
307        """Convert sensor data to a value within the range of quantity specified by sensor type.
308
309        Args:
310            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
311            values: (List[int]): Collected data to convert
312        Returns:
313            :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value
314        """
315        match sensor_type:
316            case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE:
317                sensor_value = values[0] + (values[1] << 8)
318                return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None
319            case SensorTypes.ATMOSPHERIC_PRESSURE:
320                sensor_value = values[0] + (values[1] << 8)
321                return sensor_value / 16.0 if sensor_value != 0xFFFF else None
322            case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE:
323                sensor_value = values[0] + (values[1] << 8)
324                return sensor_value if sensor_value != 0x8000 else None
325            case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \
326                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION:
327                sensor_value = values[0] + (values[1] << 8)
328                return sensor_value if sensor_value != 0xFFFF else None
329            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
330                sensor_value = values[0] + (values[1] << 8)
331                return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None
332            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
333                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
334                sensor_value = values[0] + (values[1] << 8)
335                return sensor_value / 1000.0 if sensor_value != 0xFFFF else None
336            case SensorTypes.EARTHS_MAGNETIC_FIELD:
337                sensor_value = values[0] + (values[1] << 8)
338                return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None
339            case SensorTypes.POWER:
340                sensor_value = values[0] + (values[1] << 8)
341                return sensor_value / 4.0 if sensor_value != 0xFFFF else None
342            case SensorTypes.CO:
343                sensor_value = values[0] + (values[1] << 8)
344                return sensor_value / 100.0 if sensor_value != 0xFFFF else None
345            case SensorTypes.O3:
346                sensor_value = values[0] + (values[1] << 8)
347                return sensor_value / 10000.0 if sensor_value != 0xFFFF else None
348            case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \
349                    SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
350                sensor_value = values[0] + (values[1] << 8)
351                return sensor_value / 4.0 if sensor_value != 0x8000 else None
352            case SensorTypes.SOUND_PRESSURE_LEVEL:
353                sensor_value = values[0] + (values[1] << 8)
354                return sensor_value / 16.0 if sensor_value != 0x8000 else None
355            case SensorTypes.ALTITUDE:
356                sensor_value = values[0] + (values[1] << 8)
357                return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None
358            case SensorTypes.ACCELERATION:
359                sensor_value = values[0] + (values[1] << 8)
360                return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None
361            case SensorTypes.NH3:
362                sensor_value = values[0] + (values[1] << 8)
363                return sensor_value / 10.0 if sensor_value != 0xFFFF else None
364            case SensorTypes.RELATIVE_HUMIDITY:
365                return values[0] / 2.0 if values[0] != 0xEE else None
366            case SensorTypes.BINARYDATA7:
367                aux = values[0] & 0x80
368                return values[0] if aux == 0 else None
369            case SensorTypes.POWER_FACTOR:
370                return values[0] / 200.0 if values[0] != 0xEE else None
371            case SensorTypes.UV_INDEX:
372                return values[0] / 8.0 if values[0] != 0xFF else None
373            case SensorTypes.PH:
374                return values[0] / 16.0 if values[0] != 0xFF else None
375            case SensorTypes.RSSI:
376                return (values[0] - 254) / 2.0 if values[0] != 0xFF else None
377            case SensorTypes.ACTION:
378                return values[0] if values[0] != 0xFB else None
379            case SensorTypes.BINARYDATA30:
380                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
381                return sensor_value if (values[3] & 0x80) == 0 else None
382            case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES:
383                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
384                return sensor_value if sensor_value != 0xFFFFFFFF else None
385            case SensorTypes.TIMESPAN_LONG:
386                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
387                return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None
388            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
389                if values[0] == 0xFF or (values[2] & 0x40) == 0:
390                    return None
391                sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60
392                if (values[2] & 0x80) != 0:
393                    sensor_value = -sensor_value
394                return sensor_value
395            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
396                sensor_value = struct.unpack('f', bytearray(values))[0]
397                if math.isnan(sensor_value):
398                    return None
399                return sensor_value
400            case SensorTypes.DATA_BLOCK:
401                length = values[0]
402                return values[1:1 + length]
403            case _:
404                return None
405
406    @staticmethod
407    def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[
408            int, float, SensorFrcErrors, None]:
409        """Convert data collected from FRC to a value within the range of quantity specified by sensor type.
410
411        Args:
412            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
413            frc_command (int): FRC command used when collecting data
414            frc_value (int): Raw data to convert
415
416        Returns:
417            :obj:`int`, :obj:`float` or :obj:`None`: Converted value
418        """
419        value = None
420        if frc_command == SensorFrcCommands.FRC_2BITS:
421            if 0 <= frc_value <= 1:
422                return SensorFrcErrors.from_int(frc_value)
423        else:
424            if 0 <= frc_value <= 3:
425                return SensorFrcErrors.from_int(frc_value)
426        match sensor_type:
427            case SensorTypes.TEMPERATURE:
428                if frc_command == SensorFrcCommands.FRC_1BYTE:
429                    value = frc_value / 2.0 - 22
430                elif frc_command == SensorFrcCommands.FRC_2BYTES:
431                    value = Common.word_complement(frc_value ^ 0x8000) / 16.0
432            case SensorTypes.LOW_VOLTAGE:
433                value = Common.word_complement(frc_value ^ 0x8000) / 16.0
434            case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG:
435                value = (frc_value - 4) / 16.0
436            case SensorTypes.CO2 | SensorTypes.VOC:
437                if frc_command == SensorFrcCommands.FRC_1BYTE:
438                    value = (frc_value - 4) * 16
439                elif frc_command == SensorFrcCommands.FRC_2BYTES:
440                    value = frc_value - 4
441            case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \
442                    SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \
443                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \
444                    SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES:
445                value = frc_value - 4
446            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
447                value = Common.word_complement(frc_value ^ 0x8000) / 1000.0
448            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
449                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
450                value = (frc_value - 4) / 1000.0
451            case SensorTypes.EARTHS_MAGNETIC_FIELD:
452                value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0
453            case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \
454                    SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
455                value = (frc_value - 4) / 4.0
456            case SensorTypes.CO:
457                value = (frc_value - 4) / 100.0
458            case SensorTypes.O3:
459                value = (frc_value - 4) / 10000.0
460            case SensorTypes.ALTITUDE:
461                value = (Common.word_complement(frc_value - 4) / 4.0) - 1024
462            case SensorTypes.ACCELERATION:
463                value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0
464            case SensorTypes.NH3:
465                value = (frc_value - 4) / 10.0
466            case SensorTypes.RELATIVE_HUMIDITY:
467                value = (frc_value - 4) / 2.0
468            case SensorTypes.BINARYDATA7:
469                if frc_command == SensorFrcCommands.FRC_2BITS:
470                    value = frc_value & 0x01
471                elif frc_command == SensorFrcCommands.FRC_1BYTE:
472                    value = frc_value - 4
473            case SensorTypes.POWER_FACTOR:
474                value = (frc_value - 4) / 200.0
475            case SensorTypes.UV_INDEX:
476                value = (frc_value - 4) / 8.0
477            case SensorTypes.PH:
478                value = (frc_value - 4) / 16.0
479            case SensorTypes.RSSI:
480                value = (frc_value - 258) / 2.0
481            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
482                aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60
483                value = -aux if frc_value & 0x800000 != 0 else aux
484            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
485                frc_value -= 4
486                aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF]
487                value = struct.unpack('f', bytearray(aux))[0]
488        return value
489
490
491def _data_len_from_type(sensor_type: Union[SensorTypes, int]):
492    """Return expected data length per node for sensor type.
493
494    Args:
495        sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
496
497    Returns:
498        :obj:`int`: Expected data length per node
499
500    Raises:
501        UnknownSensorTypeError: Raised if sensor_type value is an unknown or unsupported sensor type
502    """
503    if SensorDataSize.DATA_1BYTE_MIN <= sensor_type <= SensorDataSize.DATA_1BYTE_MAX:
504        return 1
505    if SensorDataSize.DATA_2BYTES_MIN <= sensor_type <= SensorDataSize.DATA_2BYTES_MAX:
506        return 2
507    if SensorDataSize.DATA_4BYTES_MIN <= sensor_type <= SensorDataSize.DATA_4BYTES_MAX:
508        return 4
509    raise UnknownSensorTypeError('Unsupported sensor type.')
510
511
512def _data_len_from_frc_command(frc_command: Union[SensorFrcCommands, int]):
513    """Return expected data length per node collected by FRC command.
514
515    Args:
516        frc_command (Union[SensorFrcCommands, int]): Sensor FRC command
517    Returns:
518        :obj:`int`: Expected data length per node
519    Raises:
520        ValueError: Raised if frc_command value is unknown or unsupported Sensor FRC command
521    """
522    if frc_command == SensorFrcCommands.FRC_2BITS:
523        return 0.25
524    if frc_command == SensorFrcCommands.FRC_1BYTE:
525        return 1
526    if frc_command == SensorFrcCommands.FRC_2BYTES:
527        return 2
528    if frc_command == SensorFrcCommands.FRC_4BYTES:
529        return 4
530    raise ValueError('Unsupported frc command')
class SensorParser:
 17class SensorParser:
 18    """Class for parsing data from Sensor standard response data."""
 19
 20    @classmethod
 21    def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
 22        """Process data from Enumerate DPA response into a list of SensorData objects.
 23
 24        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
 25        produced by this method will not carry a value.
 26
 27        Args:
 28            dpa (List[int]): List of pdata bytes from DPA response
 29        Returns:
 30            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 31        Raises:
 32            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
 33        """
 34        sensor_data = []
 35        for sensor_type_value in dpa:
 36            if sensor_type_value not in SensorTypes:
 37                raise UnknownSensorTypeError('Unsupported sensor type.')
 38            sensor_type = SensorTypes(sensor_type_value)
 39            sensor_class = get_sensor_class(sensor_type)
 40            sensor_data.append(
 41                SensorData(
 42                    sensor_type=sensor_type,
 43                    index=len(sensor_data),
 44                    name=sensor_class.name,
 45                    short_name=sensor_class.short_name,
 46                    unit=sensor_class.unit,
 47                    decimal_places=sensor_class.decimal_places,
 48                    frc_commands=sensor_class.frc_commands
 49                )
 50            )
 51        return sensor_data
 52
 53    @classmethod
 54    def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]:
 55        """Process data from Enumerate API response into a list of SensorData objects.
 56
 57        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
 58        produced by this method will not carry a value.
 59
 60        Args:
 61            json_data (List[dict]): List of json objects from JSON API response
 62        Returns:
 63            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 64        Raises:
 65            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
 66        """
 67        sensor_data = []
 68        for data in json_data:
 69            sensor_type_value = data['type']
 70            if sensor_type_value not in SensorTypes:
 71                raise UnknownSensorTypeError('Unsupported sensor type.')
 72            sensor_type = SensorTypes(sensor_type_value)
 73            sensor_class = get_sensor_class(sensor_type)
 74            sensor_data.append(
 75                SensorData(
 76                    sensor_type=sensor_type,
 77                    index=len(sensor_data),
 78                    name=sensor_class.name,
 79                    short_name=sensor_class.short_name,
 80                    unit=sensor_class.unit,
 81                    decimal_places=sensor_class.decimal_places,
 82                    frc_commands=sensor_class.frc_commands
 83                )
 84            )
 85        return sensor_data
 86
 87    @classmethod
 88    def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]:
 89        """Process data from ReadSensor DPA response into a list of SensorData objects.
 90
 91        Because the ReadSensors DPA response does not carry information about sensor types,
 92        it is necessary to provide sensor types for response data.
 93
 94        Args:
 95            sensor_types (List[int]): List of sensor types
 96            dpa (List[int]): List of pdata bytes from DPA response
 97        Returns:
 98            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 99        Raises:
100            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
101            ValueError: Raised if passed data is shorter than required to process all sensors
102        """
103        sensor_data = []
104        data_index = 0
105        sensor_index = 0
106        while data_index < len(dpa):
107            if sensor_index >= len(sensor_types):
108                raise ValueError('Too little sensor types provided for the amount of sensor data.')
109            sensor_type_value = sensor_types[sensor_index]
110            if sensor_type_value not in SensorTypes:
111                raise UnknownSensorTypeError('Unsupported sensor type.')
112            sensor_type = SensorTypes(sensor_type_value)
113            if sensor_type == SensorTypes.DATA_BLOCK:
114                data_len = dpa[data_index] + 1
115                if data_index + data_len - 1 >= len(dpa):
116                    raise ValueError('Data length longer than actual data.')
117            else:
118                data_len = _data_len_from_type(sensor_type)
119                if data_index + data_len > len(dpa):
120                    raise ValueError('Data length longer than actual data.')
121            sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len])
122            data_index += data_len
123            sensor_index += 1
124        return cls.read_sensors_with_types_from_dpa(sensor_data)
125
126    @classmethod
127    def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
128        """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects.
129
130        Args:
131            dpa (List[int]): List of pdata bytes from DPA response
132        Returns:
133            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
134        Raises:
135            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
136            ValueError: Raised if passed data is shorter than required to process all sensors
137        """
138        sensor_data = []
139        index = 0
140        while index < len(dpa):
141            sensor_type_value = dpa[index]
142            if sensor_type_value not in SensorTypes:
143                raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.')
144            sensor_type = SensorTypes(sensor_type_value)
145            if sensor_type == SensorTypes.DATA_BLOCK:
146                data_length = dpa[index + 1] + 1
147                if index + data_length >= len(dpa):
148                    raise ValueError('Data length is less than expected to process all sensors.')
149                data = dpa[index + 2:index + 2 + data_length - 1]
150            else:
151                data_length = _data_len_from_type(sensor_type)
152                if index + data_length >= len(dpa):
153                    raise ValueError('Data length is less than expected to process all sensors.')
154                data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length])
155            sensor_class = get_sensor_class(sensor_type)
156            sensor_data.append(
157                SensorData(
158                    sensor_type=sensor_type,
159                    index=len(sensor_data),
160                    name=sensor_class.name,
161                    short_name=sensor_class.short_name,
162                    unit=sensor_class.unit,
163                    decimal_places=sensor_class.decimal_places,
164                    frc_commands=sensor_class.frc_commands,
165                    value=(
166                        round(data, sensor_class.decimal_places)
167                        if data is not None and not isinstance(data, list)
168                        else data
169                    )
170                )
171            )
172            index += (data_length + 1)
173        return sensor_data
174
175    @classmethod
176    def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]:
177        """Process data from ReadSensorWithTypes API response into a list of SensorData objects.
178
179        Args:
180            json_data (List[dict]): List of json objects from JSON API response
181        Returns:
182            :obj:`list` of SensorData`: List of SensorData objects containing parsed data
183        Raises:
184            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
185        """
186        sensor_data = []
187        for data in json_data:
188            sensor_type_value = data['type']
189            if sensor_type_value not in SensorTypes:
190                raise UnknownSensorTypeError('Unsupported sensor type.')
191            sensor_type = SensorTypes(sensor_type_value)
192            sensor_class = get_sensor_class(sensor_type)
193            sensor_data.append(
194                SensorData(
195                    sensor_type=sensor_type,
196                    index=len(sensor_data),
197                    name=sensor_class.name,
198                    short_name=sensor_class.short_name,
199                    unit=sensor_class.unit,
200                    decimal_places=sensor_class.decimal_places,
201                    frc_commands=sensor_class.frc_commands,
202                    value=data['value']
203                )
204            )
205        return sensor_data
206
207    @classmethod
208    def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int,
209                frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None,
210                count: Optional[int] = None) -> List[SensorData]:
211        """Process data from DPA FRC response into a list of SensorData.
212
213        SensorData object contains information about the measured quantity and converted value.
214        The data argument expects only FRC data bytes, without the status byte.
215        The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response.
216
217        If count is specified, only that number of node data is processed.
218        For example, if total length of passed data is 64 bytes (which includes extra result data) and data length
219        per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be
220        processed and returned as SensorData objects.
221
222        If count is not specified and combined length of passed FRC and extra result data is not equal to the number
223        of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry,
224        a ValueError is raised.
225
226        Args:
227            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
228            sensor_index (int): Index of sensor
229            frc_command (int): FRC command used to collect data
230            data (List[int]): Data collected from Send or SendSelective message
231            extra_result (List[int]): Data collected from ExtraResult message
232            count (Union[int, None]): Specifies number of nodes to process
233        Returns:
234            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
235        Raises:
236            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
237            ValueError: Raised if combined length of frc data and extra result does not match the required data
238                        length to process nodes regardless of count argument value
239        """
240        if isinstance(sensor_type, int):
241            if sensor_type not in SensorTypes:
242                raise UnknownSensorTypeError('Unknown or unsupported sensor type.')
243            sensor_type = SensorTypes(sensor_type)
244        sensor_class = get_sensor_class(sensor_type)
245        dpa = data
246        if frc_command == SensorFrcCommands.FRC_1BYTE:
247            dpa = data[1:]
248        elif frc_command == SensorFrcCommands.FRC_2BYTES:
249            dpa = data[2:]
250        elif frc_command == SensorFrcCommands.FRC_4BYTES:
251            dpa = data[4:]
252        if extra_result is not None:
253            dpa.extend(extra_result)
254        data_len = _data_len_from_frc_command(frc_command=frc_command)
255        if count is None:
256            if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0:
257                raise ValueError('Invalid length of combined frc data and extra result data.')
258        else:
259            if frc_command != SensorFrcCommands.FRC_2BITS:
260                if len(dpa) < count * data_len:
261                    raise ValueError(f'Combined length of frc data and extra result is less than length of data'
262                                     f'required to process {count} devices.')
263                dpa = dpa[:count * data_len]
264        if data_len == 0.25:
265            itr = count + 1 if count is not None else 240
266            frc_values = []
267            for i in range(1, itr):
268                mask = 1 << (i % 8)
269                idx = math.floor(i / 8)
270                if idx + 32 >= len(dpa):
271                    raise ValueError('Combined length of frc data and extra result is too short.')
272                val = 0
273                if (dpa[idx] & mask) != 0:
274                    val = 1
275                if (dpa[idx + 32] & mask) != 0:
276                    val |= 2
277                frc_values.append(val)
278        elif data_len == 1:
279            frc_values = dpa
280        elif data_len == 2:
281            frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)]
282        else:
283            frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in
284                          range(0, len(dpa), 4)]
285        sensor_data = []
286        for frc_value in frc_values:
287            value = cls.frc_convert(sensor_type, frc_command, frc_value)
288            sensor_data.append(
289                SensorData(
290                    sensor_type=sensor_type,
291                    index=sensor_index,
292                    name=sensor_class.name,
293                    short_name=sensor_class.short_name,
294                    unit=sensor_class.unit,
295                    decimal_places=sensor_class.decimal_places,
296                    frc_commands=sensor_class.frc_commands,
297                    value=(
298                        round(value, sensor_class.decimal_places)
299                        if value is not None and not isinstance(value, SensorFrcErrors)
300                        else value
301                    )
302                )
303            )
304        return sensor_data
305
306    @staticmethod
307    def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]:
308        """Convert sensor data to a value within the range of quantity specified by sensor type.
309
310        Args:
311            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
312            values: (List[int]): Collected data to convert
313        Returns:
314            :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value
315        """
316        match sensor_type:
317            case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE:
318                sensor_value = values[0] + (values[1] << 8)
319                return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None
320            case SensorTypes.ATMOSPHERIC_PRESSURE:
321                sensor_value = values[0] + (values[1] << 8)
322                return sensor_value / 16.0 if sensor_value != 0xFFFF else None
323            case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE:
324                sensor_value = values[0] + (values[1] << 8)
325                return sensor_value if sensor_value != 0x8000 else None
326            case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \
327                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION:
328                sensor_value = values[0] + (values[1] << 8)
329                return sensor_value if sensor_value != 0xFFFF else None
330            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
331                sensor_value = values[0] + (values[1] << 8)
332                return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None
333            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
334                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
335                sensor_value = values[0] + (values[1] << 8)
336                return sensor_value / 1000.0 if sensor_value != 0xFFFF else None
337            case SensorTypes.EARTHS_MAGNETIC_FIELD:
338                sensor_value = values[0] + (values[1] << 8)
339                return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None
340            case SensorTypes.POWER:
341                sensor_value = values[0] + (values[1] << 8)
342                return sensor_value / 4.0 if sensor_value != 0xFFFF else None
343            case SensorTypes.CO:
344                sensor_value = values[0] + (values[1] << 8)
345                return sensor_value / 100.0 if sensor_value != 0xFFFF else None
346            case SensorTypes.O3:
347                sensor_value = values[0] + (values[1] << 8)
348                return sensor_value / 10000.0 if sensor_value != 0xFFFF else None
349            case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \
350                    SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
351                sensor_value = values[0] + (values[1] << 8)
352                return sensor_value / 4.0 if sensor_value != 0x8000 else None
353            case SensorTypes.SOUND_PRESSURE_LEVEL:
354                sensor_value = values[0] + (values[1] << 8)
355                return sensor_value / 16.0 if sensor_value != 0x8000 else None
356            case SensorTypes.ALTITUDE:
357                sensor_value = values[0] + (values[1] << 8)
358                return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None
359            case SensorTypes.ACCELERATION:
360                sensor_value = values[0] + (values[1] << 8)
361                return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None
362            case SensorTypes.NH3:
363                sensor_value = values[0] + (values[1] << 8)
364                return sensor_value / 10.0 if sensor_value != 0xFFFF else None
365            case SensorTypes.RELATIVE_HUMIDITY:
366                return values[0] / 2.0 if values[0] != 0xEE else None
367            case SensorTypes.BINARYDATA7:
368                aux = values[0] & 0x80
369                return values[0] if aux == 0 else None
370            case SensorTypes.POWER_FACTOR:
371                return values[0] / 200.0 if values[0] != 0xEE else None
372            case SensorTypes.UV_INDEX:
373                return values[0] / 8.0 if values[0] != 0xFF else None
374            case SensorTypes.PH:
375                return values[0] / 16.0 if values[0] != 0xFF else None
376            case SensorTypes.RSSI:
377                return (values[0] - 254) / 2.0 if values[0] != 0xFF else None
378            case SensorTypes.ACTION:
379                return values[0] if values[0] != 0xFB else None
380            case SensorTypes.BINARYDATA30:
381                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
382                return sensor_value if (values[3] & 0x80) == 0 else None
383            case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES:
384                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
385                return sensor_value if sensor_value != 0xFFFFFFFF else None
386            case SensorTypes.TIMESPAN_LONG:
387                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
388                return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None
389            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
390                if values[0] == 0xFF or (values[2] & 0x40) == 0:
391                    return None
392                sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60
393                if (values[2] & 0x80) != 0:
394                    sensor_value = -sensor_value
395                return sensor_value
396            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
397                sensor_value = struct.unpack('f', bytearray(values))[0]
398                if math.isnan(sensor_value):
399                    return None
400                return sensor_value
401            case SensorTypes.DATA_BLOCK:
402                length = values[0]
403                return values[1:1 + length]
404            case _:
405                return None
406
407    @staticmethod
408    def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[
409            int, float, SensorFrcErrors, None]:
410        """Convert data collected from FRC to a value within the range of quantity specified by sensor type.
411
412        Args:
413            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
414            frc_command (int): FRC command used when collecting data
415            frc_value (int): Raw data to convert
416
417        Returns:
418            :obj:`int`, :obj:`float` or :obj:`None`: Converted value
419        """
420        value = None
421        if frc_command == SensorFrcCommands.FRC_2BITS:
422            if 0 <= frc_value <= 1:
423                return SensorFrcErrors.from_int(frc_value)
424        else:
425            if 0 <= frc_value <= 3:
426                return SensorFrcErrors.from_int(frc_value)
427        match sensor_type:
428            case SensorTypes.TEMPERATURE:
429                if frc_command == SensorFrcCommands.FRC_1BYTE:
430                    value = frc_value / 2.0 - 22
431                elif frc_command == SensorFrcCommands.FRC_2BYTES:
432                    value = Common.word_complement(frc_value ^ 0x8000) / 16.0
433            case SensorTypes.LOW_VOLTAGE:
434                value = Common.word_complement(frc_value ^ 0x8000) / 16.0
435            case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG:
436                value = (frc_value - 4) / 16.0
437            case SensorTypes.CO2 | SensorTypes.VOC:
438                if frc_command == SensorFrcCommands.FRC_1BYTE:
439                    value = (frc_value - 4) * 16
440                elif frc_command == SensorFrcCommands.FRC_2BYTES:
441                    value = frc_value - 4
442            case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \
443                    SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \
444                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \
445                    SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES:
446                value = frc_value - 4
447            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
448                value = Common.word_complement(frc_value ^ 0x8000) / 1000.0
449            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
450                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
451                value = (frc_value - 4) / 1000.0
452            case SensorTypes.EARTHS_MAGNETIC_FIELD:
453                value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0
454            case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \
455                    SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
456                value = (frc_value - 4) / 4.0
457            case SensorTypes.CO:
458                value = (frc_value - 4) / 100.0
459            case SensorTypes.O3:
460                value = (frc_value - 4) / 10000.0
461            case SensorTypes.ALTITUDE:
462                value = (Common.word_complement(frc_value - 4) / 4.0) - 1024
463            case SensorTypes.ACCELERATION:
464                value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0
465            case SensorTypes.NH3:
466                value = (frc_value - 4) / 10.0
467            case SensorTypes.RELATIVE_HUMIDITY:
468                value = (frc_value - 4) / 2.0
469            case SensorTypes.BINARYDATA7:
470                if frc_command == SensorFrcCommands.FRC_2BITS:
471                    value = frc_value & 0x01
472                elif frc_command == SensorFrcCommands.FRC_1BYTE:
473                    value = frc_value - 4
474            case SensorTypes.POWER_FACTOR:
475                value = (frc_value - 4) / 200.0
476            case SensorTypes.UV_INDEX:
477                value = (frc_value - 4) / 8.0
478            case SensorTypes.PH:
479                value = (frc_value - 4) / 16.0
480            case SensorTypes.RSSI:
481                value = (frc_value - 258) / 2.0
482            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
483                aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60
484                value = -aux if frc_value & 0x800000 != 0 else aux
485            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
486                frc_value -= 4
487                aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF]
488                value = struct.unpack('f', bytearray(aux))[0]
489        return value

Class for parsing data from Sensor standard response data.

@classmethod
def enumerate_from_dpa(cls, dpa: List[int]) -> List[iqrfpy.objects.SensorData]:
20    @classmethod
21    def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
22        """Process data from Enumerate DPA response into a list of SensorData objects.
23
24        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
25        produced by this method will not carry a value.
26
27        Args:
28            dpa (List[int]): List of pdata bytes from DPA response
29        Returns:
30            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
31        Raises:
32            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
33        """
34        sensor_data = []
35        for sensor_type_value in dpa:
36            if sensor_type_value not in SensorTypes:
37                raise UnknownSensorTypeError('Unsupported sensor type.')
38            sensor_type = SensorTypes(sensor_type_value)
39            sensor_class = get_sensor_class(sensor_type)
40            sensor_data.append(
41                SensorData(
42                    sensor_type=sensor_type,
43                    index=len(sensor_data),
44                    name=sensor_class.name,
45                    short_name=sensor_class.short_name,
46                    unit=sensor_class.unit,
47                    decimal_places=sensor_class.decimal_places,
48                    frc_commands=sensor_class.frc_commands
49                )
50            )
51        return sensor_data

Process data from Enumerate DPA response into a list of SensorData objects.

Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects produced by this method will not carry a value.

Arguments:
  • dpa (List[int]): List of pdata bytes from DPA response
Returns:

list of SensorData: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
@classmethod
def enumerate_from_json( cls, json_data: List[dict]) -> List[iqrfpy.objects.SensorData]:
53    @classmethod
54    def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]:
55        """Process data from Enumerate API response into a list of SensorData objects.
56
57        Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects
58        produced by this method will not carry a value.
59
60        Args:
61            json_data (List[dict]): List of json objects from JSON API response
62        Returns:
63            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
64        Raises:
65            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
66        """
67        sensor_data = []
68        for data in json_data:
69            sensor_type_value = data['type']
70            if sensor_type_value not in SensorTypes:
71                raise UnknownSensorTypeError('Unsupported sensor type.')
72            sensor_type = SensorTypes(sensor_type_value)
73            sensor_class = get_sensor_class(sensor_type)
74            sensor_data.append(
75                SensorData(
76                    sensor_type=sensor_type,
77                    index=len(sensor_data),
78                    name=sensor_class.name,
79                    short_name=sensor_class.short_name,
80                    unit=sensor_class.unit,
81                    decimal_places=sensor_class.decimal_places,
82                    frc_commands=sensor_class.frc_commands
83                )
84            )
85        return sensor_data

Process data from Enumerate API response into a list of SensorData objects.

Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects produced by this method will not carry a value.

Arguments:
  • json_data (List[dict]): List of json objects from JSON API response
Returns:

list of SensorData: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
@classmethod
def read_sensors_dpa( cls, sensor_types: List[int], dpa: List[int]) -> List[iqrfpy.objects.SensorData]:
 87    @classmethod
 88    def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]:
 89        """Process data from ReadSensor DPA response into a list of SensorData objects.
 90
 91        Because the ReadSensors DPA response does not carry information about sensor types,
 92        it is necessary to provide sensor types for response data.
 93
 94        Args:
 95            sensor_types (List[int]): List of sensor types
 96            dpa (List[int]): List of pdata bytes from DPA response
 97        Returns:
 98            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
 99        Raises:
100            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
101            ValueError: Raised if passed data is shorter than required to process all sensors
102        """
103        sensor_data = []
104        data_index = 0
105        sensor_index = 0
106        while data_index < len(dpa):
107            if sensor_index >= len(sensor_types):
108                raise ValueError('Too little sensor types provided for the amount of sensor data.')
109            sensor_type_value = sensor_types[sensor_index]
110            if sensor_type_value not in SensorTypes:
111                raise UnknownSensorTypeError('Unsupported sensor type.')
112            sensor_type = SensorTypes(sensor_type_value)
113            if sensor_type == SensorTypes.DATA_BLOCK:
114                data_len = dpa[data_index] + 1
115                if data_index + data_len - 1 >= len(dpa):
116                    raise ValueError('Data length longer than actual data.')
117            else:
118                data_len = _data_len_from_type(sensor_type)
119                if data_index + data_len > len(dpa):
120                    raise ValueError('Data length longer than actual data.')
121            sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len])
122            data_index += data_len
123            sensor_index += 1
124        return cls.read_sensors_with_types_from_dpa(sensor_data)

Process data from ReadSensor DPA response into a list of SensorData objects.

Because the ReadSensors DPA response does not carry information about sensor types, it is necessary to provide sensor types for response data.

Arguments:
  • sensor_types (List[int]): List of sensor types
  • dpa (List[int]): List of pdata bytes from DPA response
Returns:

list of SensorData: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
  • ValueError: Raised if passed data is shorter than required to process all sensors
@classmethod
def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[iqrfpy.objects.SensorData]:
126    @classmethod
127    def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]:
128        """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects.
129
130        Args:
131            dpa (List[int]): List of pdata bytes from DPA response
132        Returns:
133            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
134        Raises:
135            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
136            ValueError: Raised if passed data is shorter than required to process all sensors
137        """
138        sensor_data = []
139        index = 0
140        while index < len(dpa):
141            sensor_type_value = dpa[index]
142            if sensor_type_value not in SensorTypes:
143                raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.')
144            sensor_type = SensorTypes(sensor_type_value)
145            if sensor_type == SensorTypes.DATA_BLOCK:
146                data_length = dpa[index + 1] + 1
147                if index + data_length >= len(dpa):
148                    raise ValueError('Data length is less than expected to process all sensors.')
149                data = dpa[index + 2:index + 2 + data_length - 1]
150            else:
151                data_length = _data_len_from_type(sensor_type)
152                if index + data_length >= len(dpa):
153                    raise ValueError('Data length is less than expected to process all sensors.')
154                data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length])
155            sensor_class = get_sensor_class(sensor_type)
156            sensor_data.append(
157                SensorData(
158                    sensor_type=sensor_type,
159                    index=len(sensor_data),
160                    name=sensor_class.name,
161                    short_name=sensor_class.short_name,
162                    unit=sensor_class.unit,
163                    decimal_places=sensor_class.decimal_places,
164                    frc_commands=sensor_class.frc_commands,
165                    value=(
166                        round(data, sensor_class.decimal_places)
167                        if data is not None and not isinstance(data, list)
168                        else data
169                    )
170                )
171            )
172            index += (data_length + 1)
173        return sensor_data

Process data from ReadSensorWithTypes DPA response into a list of SensorData objects.

Arguments:
  • dpa (List[int]): List of pdata bytes from DPA response
Returns:

list of SensorData: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
  • ValueError: Raised if passed data is shorter than required to process all sensors
@classmethod
def read_sensors_with_types_from_json( cls, json_data: List[dict]) -> List[iqrfpy.objects.SensorData]:
175    @classmethod
176    def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]:
177        """Process data from ReadSensorWithTypes API response into a list of SensorData objects.
178
179        Args:
180            json_data (List[dict]): List of json objects from JSON API response
181        Returns:
182            :obj:`list` of SensorData`: List of SensorData objects containing parsed data
183        Raises:
184            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
185        """
186        sensor_data = []
187        for data in json_data:
188            sensor_type_value = data['type']
189            if sensor_type_value not in SensorTypes:
190                raise UnknownSensorTypeError('Unsupported sensor type.')
191            sensor_type = SensorTypes(sensor_type_value)
192            sensor_class = get_sensor_class(sensor_type)
193            sensor_data.append(
194                SensorData(
195                    sensor_type=sensor_type,
196                    index=len(sensor_data),
197                    name=sensor_class.name,
198                    short_name=sensor_class.short_name,
199                    unit=sensor_class.unit,
200                    decimal_places=sensor_class.decimal_places,
201                    frc_commands=sensor_class.frc_commands,
202                    value=data['value']
203                )
204            )
205        return sensor_data

Process data from ReadSensorWithTypes API response into a list of SensorData objects.

Arguments:
  • json_data (List[dict]): List of json objects from JSON API response
Returns:

list of SensorData`: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
@classmethod
def frc_dpa( cls, sensor_type: Union[iqrfpy.utils.sensor_constants.SensorTypes, int], sensor_index: int, frc_command: Union[iqrfpy.utils.sensor_constants.SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None, count: Optional[int] = None) -> List[iqrfpy.objects.SensorData]:
207    @classmethod
208    def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int,
209                frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None,
210                count: Optional[int] = None) -> List[SensorData]:
211        """Process data from DPA FRC response into a list of SensorData.
212
213        SensorData object contains information about the measured quantity and converted value.
214        The data argument expects only FRC data bytes, without the status byte.
215        The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response.
216
217        If count is specified, only that number of node data is processed.
218        For example, if total length of passed data is 64 bytes (which includes extra result data) and data length
219        per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be
220        processed and returned as SensorData objects.
221
222        If count is not specified and combined length of passed FRC and extra result data is not equal to the number
223        of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry,
224        a ValueError is raised.
225
226        Args:
227            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
228            sensor_index (int): Index of sensor
229            frc_command (int): FRC command used to collect data
230            data (List[int]): Data collected from Send or SendSelective message
231            extra_result (List[int]): Data collected from ExtraResult message
232            count (Union[int, None]): Specifies number of nodes to process
233        Returns:
234            :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data
235        Raises:
236            UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
237            ValueError: Raised if combined length of frc data and extra result does not match the required data
238                        length to process nodes regardless of count argument value
239        """
240        if isinstance(sensor_type, int):
241            if sensor_type not in SensorTypes:
242                raise UnknownSensorTypeError('Unknown or unsupported sensor type.')
243            sensor_type = SensorTypes(sensor_type)
244        sensor_class = get_sensor_class(sensor_type)
245        dpa = data
246        if frc_command == SensorFrcCommands.FRC_1BYTE:
247            dpa = data[1:]
248        elif frc_command == SensorFrcCommands.FRC_2BYTES:
249            dpa = data[2:]
250        elif frc_command == SensorFrcCommands.FRC_4BYTES:
251            dpa = data[4:]
252        if extra_result is not None:
253            dpa.extend(extra_result)
254        data_len = _data_len_from_frc_command(frc_command=frc_command)
255        if count is None:
256            if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0:
257                raise ValueError('Invalid length of combined frc data and extra result data.')
258        else:
259            if frc_command != SensorFrcCommands.FRC_2BITS:
260                if len(dpa) < count * data_len:
261                    raise ValueError(f'Combined length of frc data and extra result is less than length of data'
262                                     f'required to process {count} devices.')
263                dpa = dpa[:count * data_len]
264        if data_len == 0.25:
265            itr = count + 1 if count is not None else 240
266            frc_values = []
267            for i in range(1, itr):
268                mask = 1 << (i % 8)
269                idx = math.floor(i / 8)
270                if idx + 32 >= len(dpa):
271                    raise ValueError('Combined length of frc data and extra result is too short.')
272                val = 0
273                if (dpa[idx] & mask) != 0:
274                    val = 1
275                if (dpa[idx + 32] & mask) != 0:
276                    val |= 2
277                frc_values.append(val)
278        elif data_len == 1:
279            frc_values = dpa
280        elif data_len == 2:
281            frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)]
282        else:
283            frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in
284                          range(0, len(dpa), 4)]
285        sensor_data = []
286        for frc_value in frc_values:
287            value = cls.frc_convert(sensor_type, frc_command, frc_value)
288            sensor_data.append(
289                SensorData(
290                    sensor_type=sensor_type,
291                    index=sensor_index,
292                    name=sensor_class.name,
293                    short_name=sensor_class.short_name,
294                    unit=sensor_class.unit,
295                    decimal_places=sensor_class.decimal_places,
296                    frc_commands=sensor_class.frc_commands,
297                    value=(
298                        round(value, sensor_class.decimal_places)
299                        if value is not None and not isinstance(value, SensorFrcErrors)
300                        else value
301                    )
302                )
303            )
304        return sensor_data

Process data from DPA FRC response into a list of SensorData.

SensorData object contains information about the measured quantity and converted value. The data argument expects only FRC data bytes, without the status byte. The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response.

If count is specified, only that number of node data is processed. For example, if total length of passed data is 64 bytes (which includes extra result data) and data length per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be processed and returned as SensorData objects.

If count is not specified and combined length of passed FRC and extra result data is not equal to the number of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry, a ValueError is raised.

Arguments:
  • sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
  • sensor_index (int): Index of sensor
  • frc_command (int): FRC command used to collect data
  • data (List[int]): Data collected from Send or SendSelective message
  • extra_result (List[int]): Data collected from ExtraResult message
  • count (Union[int, None]): Specifies number of nodes to process
Returns:

list of SensorData: List of SensorData objects containing parsed data

Raises:
  • UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
  • ValueError: Raised if combined length of frc data and extra result does not match the required data length to process nodes regardless of count argument value
@staticmethod
def convert( sensor_type: Union[iqrfpy.utils.sensor_constants.SensorTypes, int], values: List[int]) -> Union[int, float, List[int], NoneType]:
306    @staticmethod
307    def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]:
308        """Convert sensor data to a value within the range of quantity specified by sensor type.
309
310        Args:
311            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
312            values: (List[int]): Collected data to convert
313        Returns:
314            :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value
315        """
316        match sensor_type:
317            case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE:
318                sensor_value = values[0] + (values[1] << 8)
319                return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None
320            case SensorTypes.ATMOSPHERIC_PRESSURE:
321                sensor_value = values[0] + (values[1] << 8)
322                return sensor_value / 16.0 if sensor_value != 0xFFFF else None
323            case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE:
324                sensor_value = values[0] + (values[1] << 8)
325                return sensor_value if sensor_value != 0x8000 else None
326            case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \
327                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION:
328                sensor_value = values[0] + (values[1] << 8)
329                return sensor_value if sensor_value != 0xFFFF else None
330            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
331                sensor_value = values[0] + (values[1] << 8)
332                return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None
333            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
334                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
335                sensor_value = values[0] + (values[1] << 8)
336                return sensor_value / 1000.0 if sensor_value != 0xFFFF else None
337            case SensorTypes.EARTHS_MAGNETIC_FIELD:
338                sensor_value = values[0] + (values[1] << 8)
339                return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None
340            case SensorTypes.POWER:
341                sensor_value = values[0] + (values[1] << 8)
342                return sensor_value / 4.0 if sensor_value != 0xFFFF else None
343            case SensorTypes.CO:
344                sensor_value = values[0] + (values[1] << 8)
345                return sensor_value / 100.0 if sensor_value != 0xFFFF else None
346            case SensorTypes.O3:
347                sensor_value = values[0] + (values[1] << 8)
348                return sensor_value / 10000.0 if sensor_value != 0xFFFF else None
349            case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \
350                    SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
351                sensor_value = values[0] + (values[1] << 8)
352                return sensor_value / 4.0 if sensor_value != 0x8000 else None
353            case SensorTypes.SOUND_PRESSURE_LEVEL:
354                sensor_value = values[0] + (values[1] << 8)
355                return sensor_value / 16.0 if sensor_value != 0x8000 else None
356            case SensorTypes.ALTITUDE:
357                sensor_value = values[0] + (values[1] << 8)
358                return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None
359            case SensorTypes.ACCELERATION:
360                sensor_value = values[0] + (values[1] << 8)
361                return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None
362            case SensorTypes.NH3:
363                sensor_value = values[0] + (values[1] << 8)
364                return sensor_value / 10.0 if sensor_value != 0xFFFF else None
365            case SensorTypes.RELATIVE_HUMIDITY:
366                return values[0] / 2.0 if values[0] != 0xEE else None
367            case SensorTypes.BINARYDATA7:
368                aux = values[0] & 0x80
369                return values[0] if aux == 0 else None
370            case SensorTypes.POWER_FACTOR:
371                return values[0] / 200.0 if values[0] != 0xEE else None
372            case SensorTypes.UV_INDEX:
373                return values[0] / 8.0 if values[0] != 0xFF else None
374            case SensorTypes.PH:
375                return values[0] / 16.0 if values[0] != 0xFF else None
376            case SensorTypes.RSSI:
377                return (values[0] - 254) / 2.0 if values[0] != 0xFF else None
378            case SensorTypes.ACTION:
379                return values[0] if values[0] != 0xFB else None
380            case SensorTypes.BINARYDATA30:
381                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
382                return sensor_value if (values[3] & 0x80) == 0 else None
383            case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES:
384                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
385                return sensor_value if sensor_value != 0xFFFFFFFF else None
386            case SensorTypes.TIMESPAN_LONG:
387                sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24)
388                return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None
389            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
390                if values[0] == 0xFF or (values[2] & 0x40) == 0:
391                    return None
392                sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60
393                if (values[2] & 0x80) != 0:
394                    sensor_value = -sensor_value
395                return sensor_value
396            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
397                sensor_value = struct.unpack('f', bytearray(values))[0]
398                if math.isnan(sensor_value):
399                    return None
400                return sensor_value
401            case SensorTypes.DATA_BLOCK:
402                length = values[0]
403                return values[1:1 + length]
404            case _:
405                return None

Convert sensor data to a value within the range of quantity specified by sensor type.

Arguments:
  • sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
  • values: (List[int]): Collected data to convert
Returns:

int, float, list of int or None: Converted value

@staticmethod
def frc_convert( sensor_type: Union[iqrfpy.utils.sensor_constants.SensorTypes, int], frc_command: int, frc_value: int) -> Union[int, float, iqrfpy.utils.sensor_constants.SensorFrcErrors, NoneType]:
407    @staticmethod
408    def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[
409            int, float, SensorFrcErrors, None]:
410        """Convert data collected from FRC to a value within the range of quantity specified by sensor type.
411
412        Args:
413            sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
414            frc_command (int): FRC command used when collecting data
415            frc_value (int): Raw data to convert
416
417        Returns:
418            :obj:`int`, :obj:`float` or :obj:`None`: Converted value
419        """
420        value = None
421        if frc_command == SensorFrcCommands.FRC_2BITS:
422            if 0 <= frc_value <= 1:
423                return SensorFrcErrors.from_int(frc_value)
424        else:
425            if 0 <= frc_value <= 3:
426                return SensorFrcErrors.from_int(frc_value)
427        match sensor_type:
428            case SensorTypes.TEMPERATURE:
429                if frc_command == SensorFrcCommands.FRC_1BYTE:
430                    value = frc_value / 2.0 - 22
431                elif frc_command == SensorFrcCommands.FRC_2BYTES:
432                    value = Common.word_complement(frc_value ^ 0x8000) / 16.0
433            case SensorTypes.LOW_VOLTAGE:
434                value = Common.word_complement(frc_value ^ 0x8000) / 16.0
435            case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG:
436                value = (frc_value - 4) / 16.0
437            case SensorTypes.CO2 | SensorTypes.VOC:
438                if frc_command == SensorFrcCommands.FRC_1BYTE:
439                    value = (frc_value - 4) * 16
440                elif frc_command == SensorFrcCommands.FRC_2BYTES:
441                    value = frc_value - 4
442            case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \
443                    SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \
444                    SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \
445                    SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES:
446                value = frc_value - 4
447            case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT:
448                value = Common.word_complement(frc_value ^ 0x8000) / 1000.0
449            case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \
450                    SensorTypes.METHANE | SensorTypes.SHORT_LENGTH:
451                value = (frc_value - 4) / 1000.0
452            case SensorTypes.EARTHS_MAGNETIC_FIELD:
453                value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0
454            case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \
455                    SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40:
456                value = (frc_value - 4) / 4.0
457            case SensorTypes.CO:
458                value = (frc_value - 4) / 100.0
459            case SensorTypes.O3:
460                value = (frc_value - 4) / 10000.0
461            case SensorTypes.ALTITUDE:
462                value = (Common.word_complement(frc_value - 4) / 4.0) - 1024
463            case SensorTypes.ACCELERATION:
464                value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0
465            case SensorTypes.NH3:
466                value = (frc_value - 4) / 10.0
467            case SensorTypes.RELATIVE_HUMIDITY:
468                value = (frc_value - 4) / 2.0
469            case SensorTypes.BINARYDATA7:
470                if frc_command == SensorFrcCommands.FRC_2BITS:
471                    value = frc_value & 0x01
472                elif frc_command == SensorFrcCommands.FRC_1BYTE:
473                    value = frc_value - 4
474            case SensorTypes.POWER_FACTOR:
475                value = (frc_value - 4) / 200.0
476            case SensorTypes.UV_INDEX:
477                value = (frc_value - 4) / 8.0
478            case SensorTypes.PH:
479                value = (frc_value - 4) / 16.0
480            case SensorTypes.RSSI:
481                value = (frc_value - 258) / 2.0
482            case SensorTypes.LATITUDE | SensorTypes.LONGITUDE:
483                aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60
484                value = -aux if frc_value & 0x800000 != 0 else aux
485            case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH:
486                frc_value -= 4
487                aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF]
488                value = struct.unpack('f', bytearray(aux))[0]
489        return value

Convert data collected from FRC to a value within the range of quantity specified by sensor type.

Arguments:
  • sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
  • frc_command (int): FRC command used when collecting data
  • frc_value (int): Raw data to convert
Returns:

int, float or None: Converted value