iqrfpy.utils.sensor_parser
Sensor parser module.
Provides methods for parsing of sensor data collected by DPA and JSON API requests.
1"""Sensor parser module. 2 3Provides methods for parsing of sensor data collected by DPA and JSON API requests. 4""" 5 6import math 7import struct 8from typing import List, Union, Optional 9from iqrfpy.exceptions import UnknownSensorTypeError 10from iqrfpy.objects.sensor_data import SensorData 11from iqrfpy.utils.sensor_constants import SensorDataSize, SensorTypes, SensorFrcCommands, SensorFrcErrors 12from iqrfpy.utils.common import Common 13from iqrfpy.utils.quantity_data import get_sensor_class 14 15 16class SensorParser: 17 """Class for parsing data from Sensor standard response data.""" 18 19 @classmethod 20 def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 21 """Process data from Enumerate DPA response into a list of SensorData objects. 22 23 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 24 produced by this method will not carry a value. 25 26 Args: 27 dpa (List[int]): List of pdata bytes from DPA response 28 Returns: 29 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 30 Raises: 31 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 32 """ 33 sensor_data = [] 34 for sensor_type_value in dpa: 35 if sensor_type_value not in SensorTypes: 36 raise UnknownSensorTypeError('Unsupported sensor type.') 37 sensor_type = SensorTypes(sensor_type_value) 38 sensor_class = get_sensor_class(sensor_type) 39 sensor_data.append( 40 SensorData( 41 sensor_type=sensor_type, 42 index=len(sensor_data), 43 name=sensor_class.name, 44 short_name=sensor_class.short_name, 45 unit=sensor_class.unit, 46 decimal_places=sensor_class.decimal_places, 47 frc_commands=sensor_class.frc_commands 48 ) 49 ) 50 return sensor_data 51 52 @classmethod 53 def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]: 54 """Process data from Enumerate API response into a list of SensorData objects. 55 56 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 57 produced by this method will not carry a value. 58 59 Args: 60 json_data (List[dict]): List of json objects from JSON API response 61 Returns: 62 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 63 Raises: 64 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 65 """ 66 sensor_data = [] 67 for data in json_data: 68 sensor_type_value = data['type'] 69 if sensor_type_value not in SensorTypes: 70 raise UnknownSensorTypeError('Unsupported sensor type.') 71 sensor_type = SensorTypes(sensor_type_value) 72 sensor_class = get_sensor_class(sensor_type) 73 sensor_data.append( 74 SensorData( 75 sensor_type=sensor_type, 76 index=len(sensor_data), 77 name=sensor_class.name, 78 short_name=sensor_class.short_name, 79 unit=sensor_class.unit, 80 decimal_places=sensor_class.decimal_places, 81 frc_commands=sensor_class.frc_commands 82 ) 83 ) 84 return sensor_data 85 86 @classmethod 87 def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]: 88 """Process data from ReadSensor DPA response into a list of SensorData objects. 89 90 Because the ReadSensors DPA response does not carry information about sensor types, 91 it is necessary to provide sensor types for response data. 92 93 Args: 94 sensor_types (List[int]): List of sensor types 95 dpa (List[int]): List of pdata bytes from DPA response 96 Returns: 97 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 98 Raises: 99 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 100 ValueError: Raised if passed data is shorter than required to process all sensors 101 """ 102 sensor_data = [] 103 data_index = 0 104 sensor_index = 0 105 while data_index < len(dpa): 106 if sensor_index >= len(sensor_types): 107 raise ValueError('Too little sensor types provided for the amount of sensor data.') 108 sensor_type_value = sensor_types[sensor_index] 109 if sensor_type_value not in SensorTypes: 110 raise UnknownSensorTypeError('Unsupported sensor type.') 111 sensor_type = SensorTypes(sensor_type_value) 112 if sensor_type == SensorTypes.DATA_BLOCK: 113 data_len = dpa[data_index] + 1 114 if data_index + data_len - 1 >= len(dpa): 115 raise ValueError('Data length longer than actual data.') 116 else: 117 data_len = _data_len_from_type(sensor_type) 118 if data_index + data_len > len(dpa): 119 raise ValueError('Data length longer than actual data.') 120 sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len]) 121 data_index += data_len 122 sensor_index += 1 123 return cls.read_sensors_with_types_from_dpa(sensor_data) 124 125 @classmethod 126 def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 127 """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects. 128 129 Args: 130 dpa (List[int]): List of pdata bytes from DPA response 131 Returns: 132 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 133 Raises: 134 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 135 ValueError: Raised if passed data is shorter than required to process all sensors 136 """ 137 sensor_data = [] 138 index = 0 139 while index < len(dpa): 140 sensor_type_value = dpa[index] 141 if sensor_type_value not in SensorTypes: 142 raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.') 143 sensor_type = SensorTypes(sensor_type_value) 144 if sensor_type == SensorTypes.DATA_BLOCK: 145 data_length = dpa[index + 1] + 1 146 if index + data_length >= len(dpa): 147 raise ValueError('Data length is less than expected to process all sensors.') 148 data = dpa[index + 2:index + 2 + data_length - 1] 149 else: 150 data_length = _data_len_from_type(sensor_type) 151 if index + data_length >= len(dpa): 152 raise ValueError('Data length is less than expected to process all sensors.') 153 data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length]) 154 sensor_class = get_sensor_class(sensor_type) 155 sensor_data.append( 156 SensorData( 157 sensor_type=sensor_type, 158 index=len(sensor_data), 159 name=sensor_class.name, 160 short_name=sensor_class.short_name, 161 unit=sensor_class.unit, 162 decimal_places=sensor_class.decimal_places, 163 frc_commands=sensor_class.frc_commands, 164 value=( 165 round(data, sensor_class.decimal_places) 166 if data is not None and not isinstance(data, list) 167 else data 168 ) 169 ) 170 ) 171 index += (data_length + 1) 172 return sensor_data 173 174 @classmethod 175 def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]: 176 """Process data from ReadSensorWithTypes API response into a list of SensorData objects. 177 178 Args: 179 json_data (List[dict]): List of json objects from JSON API response 180 Returns: 181 :obj:`list` of SensorData`: List of SensorData objects containing parsed data 182 Raises: 183 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 184 """ 185 sensor_data = [] 186 for data in json_data: 187 sensor_type_value = data['type'] 188 if sensor_type_value not in SensorTypes: 189 raise UnknownSensorTypeError('Unsupported sensor type.') 190 sensor_type = SensorTypes(sensor_type_value) 191 sensor_class = get_sensor_class(sensor_type) 192 sensor_data.append( 193 SensorData( 194 sensor_type=sensor_type, 195 index=len(sensor_data), 196 name=sensor_class.name, 197 short_name=sensor_class.short_name, 198 unit=sensor_class.unit, 199 decimal_places=sensor_class.decimal_places, 200 frc_commands=sensor_class.frc_commands, 201 value=data['value'] 202 ) 203 ) 204 return sensor_data 205 206 @classmethod 207 def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int, 208 frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None, 209 count: Optional[int] = None) -> List[SensorData]: 210 """Process data from DPA FRC response into a list of SensorData. 211 212 SensorData object contains information about the measured quantity and converted value. 213 The data argument expects only FRC data bytes, without the status byte. 214 The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response. 215 216 If count is specified, only that number of node data is processed. 217 For example, if total length of passed data is 64 bytes (which includes extra result data) and data length 218 per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be 219 processed and returned as SensorData objects. 220 221 If count is not specified and combined length of passed FRC and extra result data is not equal to the number 222 of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry, 223 a ValueError is raised. 224 225 Args: 226 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 227 sensor_index (int): Index of sensor 228 frc_command (int): FRC command used to collect data 229 data (List[int]): Data collected from Send or SendSelective message 230 extra_result (List[int]): Data collected from ExtraResult message 231 count (Union[int, None]): Specifies number of nodes to process 232 Returns: 233 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 234 Raises: 235 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 236 ValueError: Raised if combined length of frc data and extra result does not match the required data 237 length to process nodes regardless of count argument value 238 """ 239 if isinstance(sensor_type, int): 240 if sensor_type not in SensorTypes: 241 raise UnknownSensorTypeError('Unknown or unsupported sensor type.') 242 sensor_type = SensorTypes(sensor_type) 243 sensor_class = get_sensor_class(sensor_type) 244 dpa = data 245 if frc_command == SensorFrcCommands.FRC_1BYTE: 246 dpa = data[1:] 247 elif frc_command == SensorFrcCommands.FRC_2BYTES: 248 dpa = data[2:] 249 elif frc_command == SensorFrcCommands.FRC_4BYTES: 250 dpa = data[4:] 251 if extra_result is not None: 252 dpa.extend(extra_result) 253 data_len = _data_len_from_frc_command(frc_command=frc_command) 254 if count is None: 255 if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0: 256 raise ValueError('Invalid length of combined frc data and extra result data.') 257 else: 258 if frc_command != SensorFrcCommands.FRC_2BITS: 259 if len(dpa) < count * data_len: 260 raise ValueError(f'Combined length of frc data and extra result is less than length of data' 261 f'required to process {count} devices.') 262 dpa = dpa[:count * data_len] 263 if data_len == 0.25: 264 itr = count + 1 if count is not None else 240 265 frc_values = [] 266 for i in range(1, itr): 267 mask = 1 << (i % 8) 268 idx = math.floor(i / 8) 269 if idx + 32 >= len(dpa): 270 raise ValueError('Combined length of frc data and extra result is too short.') 271 val = 0 272 if (dpa[idx] & mask) != 0: 273 val = 1 274 if (dpa[idx + 32] & mask) != 0: 275 val |= 2 276 frc_values.append(val) 277 elif data_len == 1: 278 frc_values = dpa 279 elif data_len == 2: 280 frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)] 281 else: 282 frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in 283 range(0, len(dpa), 4)] 284 sensor_data = [] 285 for frc_value in frc_values: 286 value = cls.frc_convert(sensor_type, frc_command, frc_value) 287 sensor_data.append( 288 SensorData( 289 sensor_type=sensor_type, 290 index=sensor_index, 291 name=sensor_class.name, 292 short_name=sensor_class.short_name, 293 unit=sensor_class.unit, 294 decimal_places=sensor_class.decimal_places, 295 frc_commands=sensor_class.frc_commands, 296 value=( 297 round(value, sensor_class.decimal_places) 298 if value is not None and not isinstance(value, SensorFrcErrors) 299 else value 300 ) 301 ) 302 ) 303 return sensor_data 304 305 @staticmethod 306 def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]: 307 """Convert sensor data to a value within the range of quantity specified by sensor type. 308 309 Args: 310 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 311 values: (List[int]): Collected data to convert 312 Returns: 313 :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value 314 """ 315 match sensor_type: 316 case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE: 317 sensor_value = values[0] + (values[1] << 8) 318 return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None 319 case SensorTypes.ATMOSPHERIC_PRESSURE: 320 sensor_value = values[0] + (values[1] << 8) 321 return sensor_value / 16.0 if sensor_value != 0xFFFF else None 322 case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE: 323 sensor_value = values[0] + (values[1] << 8) 324 return sensor_value if sensor_value != 0x8000 else None 325 case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \ 326 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION: 327 sensor_value = values[0] + (values[1] << 8) 328 return sensor_value if sensor_value != 0xFFFF else None 329 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 330 sensor_value = values[0] + (values[1] << 8) 331 return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None 332 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 333 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 334 sensor_value = values[0] + (values[1] << 8) 335 return sensor_value / 1000.0 if sensor_value != 0xFFFF else None 336 case SensorTypes.EARTHS_MAGNETIC_FIELD: 337 sensor_value = values[0] + (values[1] << 8) 338 return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None 339 case SensorTypes.POWER: 340 sensor_value = values[0] + (values[1] << 8) 341 return sensor_value / 4.0 if sensor_value != 0xFFFF else None 342 case SensorTypes.CO: 343 sensor_value = values[0] + (values[1] << 8) 344 return sensor_value / 100.0 if sensor_value != 0xFFFF else None 345 case SensorTypes.O3: 346 sensor_value = values[0] + (values[1] << 8) 347 return sensor_value / 10000.0 if sensor_value != 0xFFFF else None 348 case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \ 349 SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 350 sensor_value = values[0] + (values[1] << 8) 351 return sensor_value / 4.0 if sensor_value != 0x8000 else None 352 case SensorTypes.SOUND_PRESSURE_LEVEL: 353 sensor_value = values[0] + (values[1] << 8) 354 return sensor_value / 16.0 if sensor_value != 0x8000 else None 355 case SensorTypes.ALTITUDE: 356 sensor_value = values[0] + (values[1] << 8) 357 return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None 358 case SensorTypes.ACCELERATION: 359 sensor_value = values[0] + (values[1] << 8) 360 return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None 361 case SensorTypes.NH3: 362 sensor_value = values[0] + (values[1] << 8) 363 return sensor_value / 10.0 if sensor_value != 0xFFFF else None 364 case SensorTypes.RELATIVE_HUMIDITY: 365 return values[0] / 2.0 if values[0] != 0xEE else None 366 case SensorTypes.BINARYDATA7: 367 aux = values[0] & 0x80 368 return values[0] if aux == 0 else None 369 case SensorTypes.POWER_FACTOR: 370 return values[0] / 200.0 if values[0] != 0xEE else None 371 case SensorTypes.UV_INDEX: 372 return values[0] / 8.0 if values[0] != 0xFF else None 373 case SensorTypes.PH: 374 return values[0] / 16.0 if values[0] != 0xFF else None 375 case SensorTypes.RSSI: 376 return (values[0] - 254) / 2.0 if values[0] != 0xFF else None 377 case SensorTypes.ACTION: 378 return values[0] if values[0] != 0xFB else None 379 case SensorTypes.BINARYDATA30: 380 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 381 return sensor_value if (values[3] & 0x80) == 0 else None 382 case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES: 383 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 384 return sensor_value if sensor_value != 0xFFFFFFFF else None 385 case SensorTypes.TIMESPAN_LONG: 386 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 387 return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None 388 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 389 if values[0] == 0xFF or (values[2] & 0x40) == 0: 390 return None 391 sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60 392 if (values[2] & 0x80) != 0: 393 sensor_value = -sensor_value 394 return sensor_value 395 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 396 sensor_value = struct.unpack('f', bytearray(values))[0] 397 if math.isnan(sensor_value): 398 return None 399 return sensor_value 400 case SensorTypes.DATA_BLOCK: 401 length = values[0] 402 return values[1:1 + length] 403 case _: 404 return None 405 406 @staticmethod 407 def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[ 408 int, float, SensorFrcErrors, None]: 409 """Convert data collected from FRC to a value within the range of quantity specified by sensor type. 410 411 Args: 412 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 413 frc_command (int): FRC command used when collecting data 414 frc_value (int): Raw data to convert 415 416 Returns: 417 :obj:`int`, :obj:`float` or :obj:`None`: Converted value 418 """ 419 value = None 420 if frc_command == SensorFrcCommands.FRC_2BITS: 421 if 0 <= frc_value <= 1: 422 return SensorFrcErrors.from_int(frc_value) 423 else: 424 if 0 <= frc_value <= 3: 425 return SensorFrcErrors.from_int(frc_value) 426 match sensor_type: 427 case SensorTypes.TEMPERATURE: 428 if frc_command == SensorFrcCommands.FRC_1BYTE: 429 value = frc_value / 2.0 - 22 430 elif frc_command == SensorFrcCommands.FRC_2BYTES: 431 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 432 case SensorTypes.LOW_VOLTAGE: 433 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 434 case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG: 435 value = (frc_value - 4) / 16.0 436 case SensorTypes.CO2 | SensorTypes.VOC: 437 if frc_command == SensorFrcCommands.FRC_1BYTE: 438 value = (frc_value - 4) * 16 439 elif frc_command == SensorFrcCommands.FRC_2BYTES: 440 value = frc_value - 4 441 case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \ 442 SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \ 443 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \ 444 SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES: 445 value = frc_value - 4 446 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 447 value = Common.word_complement(frc_value ^ 0x8000) / 1000.0 448 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 449 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 450 value = (frc_value - 4) / 1000.0 451 case SensorTypes.EARTHS_MAGNETIC_FIELD: 452 value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0 453 case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \ 454 SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 455 value = (frc_value - 4) / 4.0 456 case SensorTypes.CO: 457 value = (frc_value - 4) / 100.0 458 case SensorTypes.O3: 459 value = (frc_value - 4) / 10000.0 460 case SensorTypes.ALTITUDE: 461 value = (Common.word_complement(frc_value - 4) / 4.0) - 1024 462 case SensorTypes.ACCELERATION: 463 value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0 464 case SensorTypes.NH3: 465 value = (frc_value - 4) / 10.0 466 case SensorTypes.RELATIVE_HUMIDITY: 467 value = (frc_value - 4) / 2.0 468 case SensorTypes.BINARYDATA7: 469 if frc_command == SensorFrcCommands.FRC_2BITS: 470 value = frc_value & 0x01 471 elif frc_command == SensorFrcCommands.FRC_1BYTE: 472 value = frc_value - 4 473 case SensorTypes.POWER_FACTOR: 474 value = (frc_value - 4) / 200.0 475 case SensorTypes.UV_INDEX: 476 value = (frc_value - 4) / 8.0 477 case SensorTypes.PH: 478 value = (frc_value - 4) / 16.0 479 case SensorTypes.RSSI: 480 value = (frc_value - 258) / 2.0 481 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 482 aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60 483 value = -aux if frc_value & 0x800000 != 0 else aux 484 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 485 frc_value -= 4 486 aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF] 487 value = struct.unpack('f', bytearray(aux))[0] 488 return value 489 490 491def _data_len_from_type(sensor_type: Union[SensorTypes, int]): 492 """Return expected data length per node for sensor type. 493 494 Args: 495 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 496 497 Returns: 498 :obj:`int`: Expected data length per node 499 500 Raises: 501 UnknownSensorTypeError: Raised if sensor_type value is an unknown or unsupported sensor type 502 """ 503 if SensorDataSize.DATA_1BYTE_MIN <= sensor_type <= SensorDataSize.DATA_1BYTE_MAX: 504 return 1 505 if SensorDataSize.DATA_2BYTES_MIN <= sensor_type <= SensorDataSize.DATA_2BYTES_MAX: 506 return 2 507 if SensorDataSize.DATA_4BYTES_MIN <= sensor_type <= SensorDataSize.DATA_4BYTES_MAX: 508 return 4 509 raise UnknownSensorTypeError('Unsupported sensor type.') 510 511 512def _data_len_from_frc_command(frc_command: Union[SensorFrcCommands, int]): 513 """Return expected data length per node collected by FRC command. 514 515 Args: 516 frc_command (Union[SensorFrcCommands, int]): Sensor FRC command 517 Returns: 518 :obj:`int`: Expected data length per node 519 Raises: 520 ValueError: Raised if frc_command value is unknown or unsupported Sensor FRC command 521 """ 522 if frc_command == SensorFrcCommands.FRC_2BITS: 523 return 0.25 524 if frc_command == SensorFrcCommands.FRC_1BYTE: 525 return 1 526 if frc_command == SensorFrcCommands.FRC_2BYTES: 527 return 2 528 if frc_command == SensorFrcCommands.FRC_4BYTES: 529 return 4 530 raise ValueError('Unsupported frc command')
17class SensorParser: 18 """Class for parsing data from Sensor standard response data.""" 19 20 @classmethod 21 def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 22 """Process data from Enumerate DPA response into a list of SensorData objects. 23 24 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 25 produced by this method will not carry a value. 26 27 Args: 28 dpa (List[int]): List of pdata bytes from DPA response 29 Returns: 30 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 31 Raises: 32 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 33 """ 34 sensor_data = [] 35 for sensor_type_value in dpa: 36 if sensor_type_value not in SensorTypes: 37 raise UnknownSensorTypeError('Unsupported sensor type.') 38 sensor_type = SensorTypes(sensor_type_value) 39 sensor_class = get_sensor_class(sensor_type) 40 sensor_data.append( 41 SensorData( 42 sensor_type=sensor_type, 43 index=len(sensor_data), 44 name=sensor_class.name, 45 short_name=sensor_class.short_name, 46 unit=sensor_class.unit, 47 decimal_places=sensor_class.decimal_places, 48 frc_commands=sensor_class.frc_commands 49 ) 50 ) 51 return sensor_data 52 53 @classmethod 54 def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]: 55 """Process data from Enumerate API response into a list of SensorData objects. 56 57 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 58 produced by this method will not carry a value. 59 60 Args: 61 json_data (List[dict]): List of json objects from JSON API response 62 Returns: 63 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 64 Raises: 65 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 66 """ 67 sensor_data = [] 68 for data in json_data: 69 sensor_type_value = data['type'] 70 if sensor_type_value not in SensorTypes: 71 raise UnknownSensorTypeError('Unsupported sensor type.') 72 sensor_type = SensorTypes(sensor_type_value) 73 sensor_class = get_sensor_class(sensor_type) 74 sensor_data.append( 75 SensorData( 76 sensor_type=sensor_type, 77 index=len(sensor_data), 78 name=sensor_class.name, 79 short_name=sensor_class.short_name, 80 unit=sensor_class.unit, 81 decimal_places=sensor_class.decimal_places, 82 frc_commands=sensor_class.frc_commands 83 ) 84 ) 85 return sensor_data 86 87 @classmethod 88 def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]: 89 """Process data from ReadSensor DPA response into a list of SensorData objects. 90 91 Because the ReadSensors DPA response does not carry information about sensor types, 92 it is necessary to provide sensor types for response data. 93 94 Args: 95 sensor_types (List[int]): List of sensor types 96 dpa (List[int]): List of pdata bytes from DPA response 97 Returns: 98 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 99 Raises: 100 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 101 ValueError: Raised if passed data is shorter than required to process all sensors 102 """ 103 sensor_data = [] 104 data_index = 0 105 sensor_index = 0 106 while data_index < len(dpa): 107 if sensor_index >= len(sensor_types): 108 raise ValueError('Too little sensor types provided for the amount of sensor data.') 109 sensor_type_value = sensor_types[sensor_index] 110 if sensor_type_value not in SensorTypes: 111 raise UnknownSensorTypeError('Unsupported sensor type.') 112 sensor_type = SensorTypes(sensor_type_value) 113 if sensor_type == SensorTypes.DATA_BLOCK: 114 data_len = dpa[data_index] + 1 115 if data_index + data_len - 1 >= len(dpa): 116 raise ValueError('Data length longer than actual data.') 117 else: 118 data_len = _data_len_from_type(sensor_type) 119 if data_index + data_len > len(dpa): 120 raise ValueError('Data length longer than actual data.') 121 sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len]) 122 data_index += data_len 123 sensor_index += 1 124 return cls.read_sensors_with_types_from_dpa(sensor_data) 125 126 @classmethod 127 def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 128 """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects. 129 130 Args: 131 dpa (List[int]): List of pdata bytes from DPA response 132 Returns: 133 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 134 Raises: 135 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 136 ValueError: Raised if passed data is shorter than required to process all sensors 137 """ 138 sensor_data = [] 139 index = 0 140 while index < len(dpa): 141 sensor_type_value = dpa[index] 142 if sensor_type_value not in SensorTypes: 143 raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.') 144 sensor_type = SensorTypes(sensor_type_value) 145 if sensor_type == SensorTypes.DATA_BLOCK: 146 data_length = dpa[index + 1] + 1 147 if index + data_length >= len(dpa): 148 raise ValueError('Data length is less than expected to process all sensors.') 149 data = dpa[index + 2:index + 2 + data_length - 1] 150 else: 151 data_length = _data_len_from_type(sensor_type) 152 if index + data_length >= len(dpa): 153 raise ValueError('Data length is less than expected to process all sensors.') 154 data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length]) 155 sensor_class = get_sensor_class(sensor_type) 156 sensor_data.append( 157 SensorData( 158 sensor_type=sensor_type, 159 index=len(sensor_data), 160 name=sensor_class.name, 161 short_name=sensor_class.short_name, 162 unit=sensor_class.unit, 163 decimal_places=sensor_class.decimal_places, 164 frc_commands=sensor_class.frc_commands, 165 value=( 166 round(data, sensor_class.decimal_places) 167 if data is not None and not isinstance(data, list) 168 else data 169 ) 170 ) 171 ) 172 index += (data_length + 1) 173 return sensor_data 174 175 @classmethod 176 def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]: 177 """Process data from ReadSensorWithTypes API response into a list of SensorData objects. 178 179 Args: 180 json_data (List[dict]): List of json objects from JSON API response 181 Returns: 182 :obj:`list` of SensorData`: List of SensorData objects containing parsed data 183 Raises: 184 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 185 """ 186 sensor_data = [] 187 for data in json_data: 188 sensor_type_value = data['type'] 189 if sensor_type_value not in SensorTypes: 190 raise UnknownSensorTypeError('Unsupported sensor type.') 191 sensor_type = SensorTypes(sensor_type_value) 192 sensor_class = get_sensor_class(sensor_type) 193 sensor_data.append( 194 SensorData( 195 sensor_type=sensor_type, 196 index=len(sensor_data), 197 name=sensor_class.name, 198 short_name=sensor_class.short_name, 199 unit=sensor_class.unit, 200 decimal_places=sensor_class.decimal_places, 201 frc_commands=sensor_class.frc_commands, 202 value=data['value'] 203 ) 204 ) 205 return sensor_data 206 207 @classmethod 208 def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int, 209 frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None, 210 count: Optional[int] = None) -> List[SensorData]: 211 """Process data from DPA FRC response into a list of SensorData. 212 213 SensorData object contains information about the measured quantity and converted value. 214 The data argument expects only FRC data bytes, without the status byte. 215 The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response. 216 217 If count is specified, only that number of node data is processed. 218 For example, if total length of passed data is 64 bytes (which includes extra result data) and data length 219 per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be 220 processed and returned as SensorData objects. 221 222 If count is not specified and combined length of passed FRC and extra result data is not equal to the number 223 of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry, 224 a ValueError is raised. 225 226 Args: 227 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 228 sensor_index (int): Index of sensor 229 frc_command (int): FRC command used to collect data 230 data (List[int]): Data collected from Send or SendSelective message 231 extra_result (List[int]): Data collected from ExtraResult message 232 count (Union[int, None]): Specifies number of nodes to process 233 Returns: 234 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 235 Raises: 236 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 237 ValueError: Raised if combined length of frc data and extra result does not match the required data 238 length to process nodes regardless of count argument value 239 """ 240 if isinstance(sensor_type, int): 241 if sensor_type not in SensorTypes: 242 raise UnknownSensorTypeError('Unknown or unsupported sensor type.') 243 sensor_type = SensorTypes(sensor_type) 244 sensor_class = get_sensor_class(sensor_type) 245 dpa = data 246 if frc_command == SensorFrcCommands.FRC_1BYTE: 247 dpa = data[1:] 248 elif frc_command == SensorFrcCommands.FRC_2BYTES: 249 dpa = data[2:] 250 elif frc_command == SensorFrcCommands.FRC_4BYTES: 251 dpa = data[4:] 252 if extra_result is not None: 253 dpa.extend(extra_result) 254 data_len = _data_len_from_frc_command(frc_command=frc_command) 255 if count is None: 256 if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0: 257 raise ValueError('Invalid length of combined frc data and extra result data.') 258 else: 259 if frc_command != SensorFrcCommands.FRC_2BITS: 260 if len(dpa) < count * data_len: 261 raise ValueError(f'Combined length of frc data and extra result is less than length of data' 262 f'required to process {count} devices.') 263 dpa = dpa[:count * data_len] 264 if data_len == 0.25: 265 itr = count + 1 if count is not None else 240 266 frc_values = [] 267 for i in range(1, itr): 268 mask = 1 << (i % 8) 269 idx = math.floor(i / 8) 270 if idx + 32 >= len(dpa): 271 raise ValueError('Combined length of frc data and extra result is too short.') 272 val = 0 273 if (dpa[idx] & mask) != 0: 274 val = 1 275 if (dpa[idx + 32] & mask) != 0: 276 val |= 2 277 frc_values.append(val) 278 elif data_len == 1: 279 frc_values = dpa 280 elif data_len == 2: 281 frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)] 282 else: 283 frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in 284 range(0, len(dpa), 4)] 285 sensor_data = [] 286 for frc_value in frc_values: 287 value = cls.frc_convert(sensor_type, frc_command, frc_value) 288 sensor_data.append( 289 SensorData( 290 sensor_type=sensor_type, 291 index=sensor_index, 292 name=sensor_class.name, 293 short_name=sensor_class.short_name, 294 unit=sensor_class.unit, 295 decimal_places=sensor_class.decimal_places, 296 frc_commands=sensor_class.frc_commands, 297 value=( 298 round(value, sensor_class.decimal_places) 299 if value is not None and not isinstance(value, SensorFrcErrors) 300 else value 301 ) 302 ) 303 ) 304 return sensor_data 305 306 @staticmethod 307 def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]: 308 """Convert sensor data to a value within the range of quantity specified by sensor type. 309 310 Args: 311 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 312 values: (List[int]): Collected data to convert 313 Returns: 314 :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value 315 """ 316 match sensor_type: 317 case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE: 318 sensor_value = values[0] + (values[1] << 8) 319 return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None 320 case SensorTypes.ATMOSPHERIC_PRESSURE: 321 sensor_value = values[0] + (values[1] << 8) 322 return sensor_value / 16.0 if sensor_value != 0xFFFF else None 323 case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE: 324 sensor_value = values[0] + (values[1] << 8) 325 return sensor_value if sensor_value != 0x8000 else None 326 case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \ 327 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION: 328 sensor_value = values[0] + (values[1] << 8) 329 return sensor_value if sensor_value != 0xFFFF else None 330 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 331 sensor_value = values[0] + (values[1] << 8) 332 return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None 333 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 334 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 335 sensor_value = values[0] + (values[1] << 8) 336 return sensor_value / 1000.0 if sensor_value != 0xFFFF else None 337 case SensorTypes.EARTHS_MAGNETIC_FIELD: 338 sensor_value = values[0] + (values[1] << 8) 339 return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None 340 case SensorTypes.POWER: 341 sensor_value = values[0] + (values[1] << 8) 342 return sensor_value / 4.0 if sensor_value != 0xFFFF else None 343 case SensorTypes.CO: 344 sensor_value = values[0] + (values[1] << 8) 345 return sensor_value / 100.0 if sensor_value != 0xFFFF else None 346 case SensorTypes.O3: 347 sensor_value = values[0] + (values[1] << 8) 348 return sensor_value / 10000.0 if sensor_value != 0xFFFF else None 349 case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \ 350 SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 351 sensor_value = values[0] + (values[1] << 8) 352 return sensor_value / 4.0 if sensor_value != 0x8000 else None 353 case SensorTypes.SOUND_PRESSURE_LEVEL: 354 sensor_value = values[0] + (values[1] << 8) 355 return sensor_value / 16.0 if sensor_value != 0x8000 else None 356 case SensorTypes.ALTITUDE: 357 sensor_value = values[0] + (values[1] << 8) 358 return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None 359 case SensorTypes.ACCELERATION: 360 sensor_value = values[0] + (values[1] << 8) 361 return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None 362 case SensorTypes.NH3: 363 sensor_value = values[0] + (values[1] << 8) 364 return sensor_value / 10.0 if sensor_value != 0xFFFF else None 365 case SensorTypes.RELATIVE_HUMIDITY: 366 return values[0] / 2.0 if values[0] != 0xEE else None 367 case SensorTypes.BINARYDATA7: 368 aux = values[0] & 0x80 369 return values[0] if aux == 0 else None 370 case SensorTypes.POWER_FACTOR: 371 return values[0] / 200.0 if values[0] != 0xEE else None 372 case SensorTypes.UV_INDEX: 373 return values[0] / 8.0 if values[0] != 0xFF else None 374 case SensorTypes.PH: 375 return values[0] / 16.0 if values[0] != 0xFF else None 376 case SensorTypes.RSSI: 377 return (values[0] - 254) / 2.0 if values[0] != 0xFF else None 378 case SensorTypes.ACTION: 379 return values[0] if values[0] != 0xFB else None 380 case SensorTypes.BINARYDATA30: 381 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 382 return sensor_value if (values[3] & 0x80) == 0 else None 383 case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES: 384 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 385 return sensor_value if sensor_value != 0xFFFFFFFF else None 386 case SensorTypes.TIMESPAN_LONG: 387 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 388 return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None 389 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 390 if values[0] == 0xFF or (values[2] & 0x40) == 0: 391 return None 392 sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60 393 if (values[2] & 0x80) != 0: 394 sensor_value = -sensor_value 395 return sensor_value 396 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 397 sensor_value = struct.unpack('f', bytearray(values))[0] 398 if math.isnan(sensor_value): 399 return None 400 return sensor_value 401 case SensorTypes.DATA_BLOCK: 402 length = values[0] 403 return values[1:1 + length] 404 case _: 405 return None 406 407 @staticmethod 408 def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[ 409 int, float, SensorFrcErrors, None]: 410 """Convert data collected from FRC to a value within the range of quantity specified by sensor type. 411 412 Args: 413 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 414 frc_command (int): FRC command used when collecting data 415 frc_value (int): Raw data to convert 416 417 Returns: 418 :obj:`int`, :obj:`float` or :obj:`None`: Converted value 419 """ 420 value = None 421 if frc_command == SensorFrcCommands.FRC_2BITS: 422 if 0 <= frc_value <= 1: 423 return SensorFrcErrors.from_int(frc_value) 424 else: 425 if 0 <= frc_value <= 3: 426 return SensorFrcErrors.from_int(frc_value) 427 match sensor_type: 428 case SensorTypes.TEMPERATURE: 429 if frc_command == SensorFrcCommands.FRC_1BYTE: 430 value = frc_value / 2.0 - 22 431 elif frc_command == SensorFrcCommands.FRC_2BYTES: 432 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 433 case SensorTypes.LOW_VOLTAGE: 434 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 435 case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG: 436 value = (frc_value - 4) / 16.0 437 case SensorTypes.CO2 | SensorTypes.VOC: 438 if frc_command == SensorFrcCommands.FRC_1BYTE: 439 value = (frc_value - 4) * 16 440 elif frc_command == SensorFrcCommands.FRC_2BYTES: 441 value = frc_value - 4 442 case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \ 443 SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \ 444 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \ 445 SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES: 446 value = frc_value - 4 447 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 448 value = Common.word_complement(frc_value ^ 0x8000) / 1000.0 449 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 450 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 451 value = (frc_value - 4) / 1000.0 452 case SensorTypes.EARTHS_MAGNETIC_FIELD: 453 value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0 454 case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \ 455 SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 456 value = (frc_value - 4) / 4.0 457 case SensorTypes.CO: 458 value = (frc_value - 4) / 100.0 459 case SensorTypes.O3: 460 value = (frc_value - 4) / 10000.0 461 case SensorTypes.ALTITUDE: 462 value = (Common.word_complement(frc_value - 4) / 4.0) - 1024 463 case SensorTypes.ACCELERATION: 464 value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0 465 case SensorTypes.NH3: 466 value = (frc_value - 4) / 10.0 467 case SensorTypes.RELATIVE_HUMIDITY: 468 value = (frc_value - 4) / 2.0 469 case SensorTypes.BINARYDATA7: 470 if frc_command == SensorFrcCommands.FRC_2BITS: 471 value = frc_value & 0x01 472 elif frc_command == SensorFrcCommands.FRC_1BYTE: 473 value = frc_value - 4 474 case SensorTypes.POWER_FACTOR: 475 value = (frc_value - 4) / 200.0 476 case SensorTypes.UV_INDEX: 477 value = (frc_value - 4) / 8.0 478 case SensorTypes.PH: 479 value = (frc_value - 4) / 16.0 480 case SensorTypes.RSSI: 481 value = (frc_value - 258) / 2.0 482 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 483 aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60 484 value = -aux if frc_value & 0x800000 != 0 else aux 485 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 486 frc_value -= 4 487 aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF] 488 value = struct.unpack('f', bytearray(aux))[0] 489 return value
Class for parsing data from Sensor standard response data.
20 @classmethod 21 def enumerate_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 22 """Process data from Enumerate DPA response into a list of SensorData objects. 23 24 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 25 produced by this method will not carry a value. 26 27 Args: 28 dpa (List[int]): List of pdata bytes from DPA response 29 Returns: 30 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 31 Raises: 32 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 33 """ 34 sensor_data = [] 35 for sensor_type_value in dpa: 36 if sensor_type_value not in SensorTypes: 37 raise UnknownSensorTypeError('Unsupported sensor type.') 38 sensor_type = SensorTypes(sensor_type_value) 39 sensor_class = get_sensor_class(sensor_type) 40 sensor_data.append( 41 SensorData( 42 sensor_type=sensor_type, 43 index=len(sensor_data), 44 name=sensor_class.name, 45 short_name=sensor_class.short_name, 46 unit=sensor_class.unit, 47 decimal_places=sensor_class.decimal_places, 48 frc_commands=sensor_class.frc_commands 49 ) 50 ) 51 return sensor_data
Process data from Enumerate DPA response into a list of SensorData objects.
Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects produced by this method will not carry a value.
Arguments:
- dpa (List[int]): List of pdata bytes from DPA response
Returns:
listofSensorData: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
53 @classmethod 54 def enumerate_from_json(cls, json_data: List[dict]) -> List[SensorData]: 55 """Process data from Enumerate API response into a list of SensorData objects. 56 57 Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects 58 produced by this method will not carry a value. 59 60 Args: 61 json_data (List[dict]): List of json objects from JSON API response 62 Returns: 63 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 64 Raises: 65 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 66 """ 67 sensor_data = [] 68 for data in json_data: 69 sensor_type_value = data['type'] 70 if sensor_type_value not in SensorTypes: 71 raise UnknownSensorTypeError('Unsupported sensor type.') 72 sensor_type = SensorTypes(sensor_type_value) 73 sensor_class = get_sensor_class(sensor_type) 74 sensor_data.append( 75 SensorData( 76 sensor_type=sensor_type, 77 index=len(sensor_data), 78 name=sensor_class.name, 79 short_name=sensor_class.short_name, 80 unit=sensor_class.unit, 81 decimal_places=sensor_class.decimal_places, 82 frc_commands=sensor_class.frc_commands 83 ) 84 ) 85 return sensor_data
Process data from Enumerate API response into a list of SensorData objects.
Note that Enumerate request only provides sensor types (quantities), as such, the SensorData objects produced by this method will not carry a value.
Arguments:
- json_data (List[dict]): List of json objects from JSON API response
Returns:
listofSensorData: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
87 @classmethod 88 def read_sensors_dpa(cls, sensor_types: List[int], dpa: List[int]) -> List[SensorData]: 89 """Process data from ReadSensor DPA response into a list of SensorData objects. 90 91 Because the ReadSensors DPA response does not carry information about sensor types, 92 it is necessary to provide sensor types for response data. 93 94 Args: 95 sensor_types (List[int]): List of sensor types 96 dpa (List[int]): List of pdata bytes from DPA response 97 Returns: 98 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 99 Raises: 100 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 101 ValueError: Raised if passed data is shorter than required to process all sensors 102 """ 103 sensor_data = [] 104 data_index = 0 105 sensor_index = 0 106 while data_index < len(dpa): 107 if sensor_index >= len(sensor_types): 108 raise ValueError('Too little sensor types provided for the amount of sensor data.') 109 sensor_type_value = sensor_types[sensor_index] 110 if sensor_type_value not in SensorTypes: 111 raise UnknownSensorTypeError('Unsupported sensor type.') 112 sensor_type = SensorTypes(sensor_type_value) 113 if sensor_type == SensorTypes.DATA_BLOCK: 114 data_len = dpa[data_index] + 1 115 if data_index + data_len - 1 >= len(dpa): 116 raise ValueError('Data length longer than actual data.') 117 else: 118 data_len = _data_len_from_type(sensor_type) 119 if data_index + data_len > len(dpa): 120 raise ValueError('Data length longer than actual data.') 121 sensor_data.extend([sensor_type_value] + dpa[data_index:data_index + data_len]) 122 data_index += data_len 123 sensor_index += 1 124 return cls.read_sensors_with_types_from_dpa(sensor_data)
Process data from ReadSensor DPA response into a list of SensorData objects.
Because the ReadSensors DPA response does not carry information about sensor types, it is necessary to provide sensor types for response data.
Arguments:
- sensor_types (List[int]): List of sensor types
- dpa (List[int]): List of pdata bytes from DPA response
Returns:
listofSensorData: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
- ValueError: Raised if passed data is shorter than required to process all sensors
126 @classmethod 127 def read_sensors_with_types_from_dpa(cls, dpa: List[int]) -> List[SensorData]: 128 """Process data from ReadSensorWithTypes DPA response into a list of SensorData objects. 129 130 Args: 131 dpa (List[int]): List of pdata bytes from DPA response 132 Returns: 133 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 134 Raises: 135 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 136 ValueError: Raised if passed data is shorter than required to process all sensors 137 """ 138 sensor_data = [] 139 index = 0 140 while index < len(dpa): 141 sensor_type_value = dpa[index] 142 if sensor_type_value not in SensorTypes: 143 raise UnknownSensorTypeError(f'Unsupported sensor type: {sensor_type_value}.') 144 sensor_type = SensorTypes(sensor_type_value) 145 if sensor_type == SensorTypes.DATA_BLOCK: 146 data_length = dpa[index + 1] + 1 147 if index + data_length >= len(dpa): 148 raise ValueError('Data length is less than expected to process all sensors.') 149 data = dpa[index + 2:index + 2 + data_length - 1] 150 else: 151 data_length = _data_len_from_type(sensor_type) 152 if index + data_length >= len(dpa): 153 raise ValueError('Data length is less than expected to process all sensors.') 154 data = cls.convert(sensor_type, dpa[index + 1:index + 1 + data_length]) 155 sensor_class = get_sensor_class(sensor_type) 156 sensor_data.append( 157 SensorData( 158 sensor_type=sensor_type, 159 index=len(sensor_data), 160 name=sensor_class.name, 161 short_name=sensor_class.short_name, 162 unit=sensor_class.unit, 163 decimal_places=sensor_class.decimal_places, 164 frc_commands=sensor_class.frc_commands, 165 value=( 166 round(data, sensor_class.decimal_places) 167 if data is not None and not isinstance(data, list) 168 else data 169 ) 170 ) 171 ) 172 index += (data_length + 1) 173 return sensor_data
Process data from ReadSensorWithTypes DPA response into a list of SensorData objects.
Arguments:
- dpa (List[int]): List of pdata bytes from DPA response
Returns:
listofSensorData: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
- ValueError: Raised if passed data is shorter than required to process all sensors
175 @classmethod 176 def read_sensors_with_types_from_json(cls, json_data: List[dict]) -> List[SensorData]: 177 """Process data from ReadSensorWithTypes API response into a list of SensorData objects. 178 179 Args: 180 json_data (List[dict]): List of json objects from JSON API response 181 Returns: 182 :obj:`list` of SensorData`: List of SensorData objects containing parsed data 183 Raises: 184 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 185 """ 186 sensor_data = [] 187 for data in json_data: 188 sensor_type_value = data['type'] 189 if sensor_type_value not in SensorTypes: 190 raise UnknownSensorTypeError('Unsupported sensor type.') 191 sensor_type = SensorTypes(sensor_type_value) 192 sensor_class = get_sensor_class(sensor_type) 193 sensor_data.append( 194 SensorData( 195 sensor_type=sensor_type, 196 index=len(sensor_data), 197 name=sensor_class.name, 198 short_name=sensor_class.short_name, 199 unit=sensor_class.unit, 200 decimal_places=sensor_class.decimal_places, 201 frc_commands=sensor_class.frc_commands, 202 value=data['value'] 203 ) 204 ) 205 return sensor_data
Process data from ReadSensorWithTypes API response into a list of SensorData objects.
Arguments:
- json_data (List[dict]): List of json objects from JSON API response
Returns:
listof SensorData`: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
207 @classmethod 208 def frc_dpa(cls, sensor_type: Union[SensorTypes, int], sensor_index: int, 209 frc_command: Union[SensorFrcCommands, int], data: List[int], extra_result: Optional[List[int]] = None, 210 count: Optional[int] = None) -> List[SensorData]: 211 """Process data from DPA FRC response into a list of SensorData. 212 213 SensorData object contains information about the measured quantity and converted value. 214 The data argument expects only FRC data bytes, without the status byte. 215 The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response. 216 217 If count is specified, only that number of node data is processed. 218 For example, if total length of passed data is 64 bytes (which includes extra result data) and data length 219 per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be 220 processed and returned as SensorData objects. 221 222 If count is not specified and combined length of passed FRC and extra result data is not equal to the number 223 of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry, 224 a ValueError is raised. 225 226 Args: 227 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 228 sensor_index (int): Index of sensor 229 frc_command (int): FRC command used to collect data 230 data (List[int]): Data collected from Send or SendSelective message 231 extra_result (List[int]): Data collected from ExtraResult message 232 count (Union[int, None]): Specifies number of nodes to process 233 Returns: 234 :obj:`list` of :obj:`SensorData`: List of SensorData objects containing parsed data 235 Raises: 236 UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized 237 ValueError: Raised if combined length of frc data and extra result does not match the required data 238 length to process nodes regardless of count argument value 239 """ 240 if isinstance(sensor_type, int): 241 if sensor_type not in SensorTypes: 242 raise UnknownSensorTypeError('Unknown or unsupported sensor type.') 243 sensor_type = SensorTypes(sensor_type) 244 sensor_class = get_sensor_class(sensor_type) 245 dpa = data 246 if frc_command == SensorFrcCommands.FRC_1BYTE: 247 dpa = data[1:] 248 elif frc_command == SensorFrcCommands.FRC_2BYTES: 249 dpa = data[2:] 250 elif frc_command == SensorFrcCommands.FRC_4BYTES: 251 dpa = data[4:] 252 if extra_result is not None: 253 dpa.extend(extra_result) 254 data_len = _data_len_from_frc_command(frc_command=frc_command) 255 if count is None: 256 if frc_command != SensorFrcCommands.FRC_2BITS and len(dpa) % data_len != 0: 257 raise ValueError('Invalid length of combined frc data and extra result data.') 258 else: 259 if frc_command != SensorFrcCommands.FRC_2BITS: 260 if len(dpa) < count * data_len: 261 raise ValueError(f'Combined length of frc data and extra result is less than length of data' 262 f'required to process {count} devices.') 263 dpa = dpa[:count * data_len] 264 if data_len == 0.25: 265 itr = count + 1 if count is not None else 240 266 frc_values = [] 267 for i in range(1, itr): 268 mask = 1 << (i % 8) 269 idx = math.floor(i / 8) 270 if idx + 32 >= len(dpa): 271 raise ValueError('Combined length of frc data and extra result is too short.') 272 val = 0 273 if (dpa[idx] & mask) != 0: 274 val = 1 275 if (dpa[idx + 32] & mask) != 0: 276 val |= 2 277 frc_values.append(val) 278 elif data_len == 1: 279 frc_values = dpa 280 elif data_len == 2: 281 frc_values = [(dpa[i + 1] << 8) + dpa[i] for i in range(0, len(dpa), 2)] 282 else: 283 frc_values = [(dpa[i + 3] << 24) + (dpa[i + 2] << 16) + (dpa[i + 1] << 8) + dpa[i] for i in 284 range(0, len(dpa), 4)] 285 sensor_data = [] 286 for frc_value in frc_values: 287 value = cls.frc_convert(sensor_type, frc_command, frc_value) 288 sensor_data.append( 289 SensorData( 290 sensor_type=sensor_type, 291 index=sensor_index, 292 name=sensor_class.name, 293 short_name=sensor_class.short_name, 294 unit=sensor_class.unit, 295 decimal_places=sensor_class.decimal_places, 296 frc_commands=sensor_class.frc_commands, 297 value=( 298 round(value, sensor_class.decimal_places) 299 if value is not None and not isinstance(value, SensorFrcErrors) 300 else value 301 ) 302 ) 303 ) 304 return sensor_data
Process data from DPA FRC response into a list of SensorData.
SensorData object contains information about the measured quantity and converted value. The data argument expects only FRC data bytes, without the status byte. The extra_result argument can be omitted if the processed data fit into just the Send or SendSelective response.
If count is specified, only that number of node data is processed. For example, if total length of passed data is 64 bytes (which includes extra result data) and data length per node is 2 bytes, and only 3 nodes are to be processed, then only first 6 bytes of the passed data will be processed and returned as SensorData objects.
If count is not specified and combined length of passed FRC and extra result data is not equal to the number of bytes required to process as many nodes as a single Send (SendSelective) and ExtraResult request can carry, a ValueError is raised.
Arguments:
- sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
- sensor_index (int): Index of sensor
- frc_command (int): FRC command used to collect data
- data (List[int]): Data collected from Send or SendSelective message
- extra_result (List[int]): Data collected from ExtraResult message
- count (Union[int, None]): Specifies number of nodes to process
Returns:
listofSensorData: List of SensorData objects containing parsed data
Raises:
- UnknownSensorTypeError: Raised if sensor type is passed as integer and the value is not recognized
- ValueError: Raised if combined length of frc data and extra result does not match the required data length to process nodes regardless of count argument value
306 @staticmethod 307 def convert(sensor_type: Union[SensorTypes, int], values: List[int]) -> Union[int, float, List[int], None]: 308 """Convert sensor data to a value within the range of quantity specified by sensor type. 309 310 Args: 311 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 312 values: (List[int]): Collected data to convert 313 Returns: 314 :obj:`int`, :obj:`float`, :obj:`list` of :obj:`int` or :obj:`None`: Converted value 315 """ 316 match sensor_type: 317 case SensorTypes.TEMPERATURE | SensorTypes.LOW_VOLTAGE: 318 sensor_value = values[0] + (values[1] << 8) 319 return Common.word_complement(sensor_value) / 16.0 if sensor_value != 0x8000 else None 320 case SensorTypes.ATMOSPHERIC_PRESSURE: 321 sensor_value = values[0] + (values[1] << 8) 322 return sensor_value / 16.0 if sensor_value != 0xFFFF else None 323 case SensorTypes.CO2 | SensorTypes.VOC | SensorTypes.COLOR_TEMPERATURE: 324 sensor_value = values[0] + (values[1] << 8) 325 return sensor_value if sensor_value != 0x8000 else None 326 case SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | SensorTypes.TVOC | \ 327 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION: 328 sensor_value = values[0] + (values[1] << 8) 329 return sensor_value if sensor_value != 0xFFFF else None 330 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 331 sensor_value = values[0] + (values[1] << 8) 332 return Common.word_complement(sensor_value) / 1000.0 if sensor_value != 0x8000 else None 333 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 334 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 335 sensor_value = values[0] + (values[1] << 8) 336 return sensor_value / 1000.0 if sensor_value != 0xFFFF else None 337 case SensorTypes.EARTHS_MAGNETIC_FIELD: 338 sensor_value = values[0] + (values[1] << 8) 339 return Common.word_complement(sensor_value) / 10000000.0 if sensor_value != 0x8000 else None 340 case SensorTypes.POWER: 341 sensor_value = values[0] + (values[1] << 8) 342 return sensor_value / 4.0 if sensor_value != 0xFFFF else None 343 case SensorTypes.CO: 344 sensor_value = values[0] + (values[1] << 8) 345 return sensor_value / 100.0 if sensor_value != 0xFFFF else None 346 case SensorTypes.O3: 347 sensor_value = values[0] + (values[1] << 8) 348 return sensor_value / 10000.0 if sensor_value != 0xFFFF else None 349 case SensorTypes.PARTICULATES_PM2_5 | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM4 | \ 350 SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 351 sensor_value = values[0] + (values[1] << 8) 352 return sensor_value / 4.0 if sensor_value != 0x8000 else None 353 case SensorTypes.SOUND_PRESSURE_LEVEL: 354 sensor_value = values[0] + (values[1] << 8) 355 return sensor_value / 16.0 if sensor_value != 0x8000 else None 356 case SensorTypes.ALTITUDE: 357 sensor_value = values[0] + (values[1] << 8) 358 return (sensor_value / 4.0 - 1024) if sensor_value != 0xFFFF else None 359 case SensorTypes.ACCELERATION: 360 sensor_value = values[0] + (values[1] << 8) 361 return Common.word_complement(sensor_value) / 256.0 if sensor_value != 0x8000 else None 362 case SensorTypes.NH3: 363 sensor_value = values[0] + (values[1] << 8) 364 return sensor_value / 10.0 if sensor_value != 0xFFFF else None 365 case SensorTypes.RELATIVE_HUMIDITY: 366 return values[0] / 2.0 if values[0] != 0xEE else None 367 case SensorTypes.BINARYDATA7: 368 aux = values[0] & 0x80 369 return values[0] if aux == 0 else None 370 case SensorTypes.POWER_FACTOR: 371 return values[0] / 200.0 if values[0] != 0xEE else None 372 case SensorTypes.UV_INDEX: 373 return values[0] / 8.0 if values[0] != 0xFF else None 374 case SensorTypes.PH: 375 return values[0] / 16.0 if values[0] != 0xFF else None 376 case SensorTypes.RSSI: 377 return (values[0] - 254) / 2.0 if values[0] != 0xFF else None 378 case SensorTypes.ACTION: 379 return values[0] if values[0] != 0xFB else None 380 case SensorTypes.BINARYDATA30: 381 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 382 return sensor_value if (values[3] & 0x80) == 0 else None 383 case SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.FOUR_BYTES: 384 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 385 return sensor_value if sensor_value != 0xFFFFFFFF else None 386 case SensorTypes.TIMESPAN_LONG: 387 sensor_value = values[0] + (values[1] << 8) + (values[2] << 16) + (values[3] << 24) 388 return sensor_value / 16.0 if sensor_value != 0xFFFFFFFF else None 389 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 390 if values[0] == 0xFF or (values[2] & 0x40) == 0: 391 return None 392 sensor_value = values[3] + ((values[2] & 0x3F) + (values[0] + (values[1] << 8)) / 10000) / 60 393 if (values[2] & 0x80) != 0: 394 sensor_value = -sensor_value 395 return sensor_value 396 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 397 sensor_value = struct.unpack('f', bytearray(values))[0] 398 if math.isnan(sensor_value): 399 return None 400 return sensor_value 401 case SensorTypes.DATA_BLOCK: 402 length = values[0] 403 return values[1:1 + length] 404 case _: 405 return None
Convert sensor data to a value within the range of quantity specified by sensor type.
Arguments:
- sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
- values: (List[int]): Collected data to convert
Returns:
int,float,listofintorNone: Converted value
407 @staticmethod 408 def frc_convert(sensor_type: Union[SensorTypes, int], frc_command: int, frc_value: int) -> Union[ 409 int, float, SensorFrcErrors, None]: 410 """Convert data collected from FRC to a value within the range of quantity specified by sensor type. 411 412 Args: 413 sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity) 414 frc_command (int): FRC command used when collecting data 415 frc_value (int): Raw data to convert 416 417 Returns: 418 :obj:`int`, :obj:`float` or :obj:`None`: Converted value 419 """ 420 value = None 421 if frc_command == SensorFrcCommands.FRC_2BITS: 422 if 0 <= frc_value <= 1: 423 return SensorFrcErrors.from_int(frc_value) 424 else: 425 if 0 <= frc_value <= 3: 426 return SensorFrcErrors.from_int(frc_value) 427 match sensor_type: 428 case SensorTypes.TEMPERATURE: 429 if frc_command == SensorFrcCommands.FRC_1BYTE: 430 value = frc_value / 2.0 - 22 431 elif frc_command == SensorFrcCommands.FRC_2BYTES: 432 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 433 case SensorTypes.LOW_VOLTAGE: 434 value = Common.word_complement(frc_value ^ 0x8000) / 16.0 435 case SensorTypes.ATMOSPHERIC_PRESSURE | SensorTypes.SOUND_PRESSURE_LEVEL | SensorTypes.TIMESPAN_LONG: 436 value = (frc_value - 4) / 16.0 437 case SensorTypes.CO2 | SensorTypes.VOC: 438 if frc_command == SensorFrcCommands.FRC_1BYTE: 439 value = (frc_value - 4) * 16 440 elif frc_command == SensorFrcCommands.FRC_2BYTES: 441 value = frc_value - 4 442 case SensorTypes.COLOR_TEMPERATURE | SensorTypes.TIMESPAN | SensorTypes.ILLUMINANCE | \ 443 SensorTypes.CONSUMPTION | SensorTypes.DATETIME | SensorTypes.TVOC | \ 444 SensorTypes.NOX | SensorTypes.ACTIVITY_CONCENTRATION | SensorTypes.ACTION | \ 445 SensorTypes.BINARYDATA30 | SensorTypes.FOUR_BYTES: 446 value = frc_value - 4 447 case SensorTypes.EXTRA_LOW_VOLTAGE | SensorTypes.CURRENT: 448 value = Common.word_complement(frc_value ^ 0x8000) / 1000.0 449 case SensorTypes.MAINS_FREQUENCY | SensorTypes.NO2 | SensorTypes.SO2 | \ 450 SensorTypes.METHANE | SensorTypes.SHORT_LENGTH: 451 value = (frc_value - 4) / 1000.0 452 case SensorTypes.EARTHS_MAGNETIC_FIELD: 453 value = Common.word_complement(frc_value ^ 0x8000) / 10000000.0 454 case SensorTypes.POWER | SensorTypes.PARTICULATES_PM1 | SensorTypes.PARTICULATES_PM2_5 | \ 455 SensorTypes.PARTICULATES_PM4 | SensorTypes.PARTICULATES_PM10 | SensorTypes.PARTICULATES_PM40: 456 value = (frc_value - 4) / 4.0 457 case SensorTypes.CO: 458 value = (frc_value - 4) / 100.0 459 case SensorTypes.O3: 460 value = (frc_value - 4) / 10000.0 461 case SensorTypes.ALTITUDE: 462 value = (Common.word_complement(frc_value - 4) / 4.0) - 1024 463 case SensorTypes.ACCELERATION: 464 value = (Common.word_complement(frc_value ^ 0x8000)) / 256.0 465 case SensorTypes.NH3: 466 value = (frc_value - 4) / 10.0 467 case SensorTypes.RELATIVE_HUMIDITY: 468 value = (frc_value - 4) / 2.0 469 case SensorTypes.BINARYDATA7: 470 if frc_command == SensorFrcCommands.FRC_2BITS: 471 value = frc_value & 0x01 472 elif frc_command == SensorFrcCommands.FRC_1BYTE: 473 value = frc_value - 4 474 case SensorTypes.POWER_FACTOR: 475 value = (frc_value - 4) / 200.0 476 case SensorTypes.UV_INDEX: 477 value = (frc_value - 4) / 8.0 478 case SensorTypes.PH: 479 value = (frc_value - 4) / 16.0 480 case SensorTypes.RSSI: 481 value = (frc_value - 258) / 2.0 482 case SensorTypes.LATITUDE | SensorTypes.LONGITUDE: 483 aux = ((frc_value >> 24) & 0xFF) + (((frc_value >> 16) & 0x3F) + (frc_value & 0xFFFF) / 10000) / 60 484 value = -aux if frc_value & 0x800000 != 0 else aux 485 case SensorTypes.TEMPERATURE_FLOAT | SensorTypes.LENGTH: 486 frc_value -= 4 487 aux = [frc_value & 0xFF, (frc_value >> 8) & 0xFF, (frc_value >> 16) & 0xFF, (frc_value >> 24) & 0xFF] 488 value = struct.unpack('f', bytearray(aux))[0] 489 return value
Convert data collected from FRC to a value within the range of quantity specified by sensor type.
Arguments:
- sensor_type (Union[SensorTypes, int]): Sensor type (represents a quantity)
- frc_command (int): FRC command used when collecting data
- frc_value (int): Raw data to convert
Returns:
int,floatorNone: Converted value