iqrfpy.utils.common
Common utility module.
common module provides utilities and auxiliary methods for extraction of data from DPA bytes and Daemon API JSON messages.
1"""Common utility module. 2 3common module provides utilities and auxiliary methods for 4extraction of data from DPA bytes and Daemon API JSON messages. 5""" 6 7__all__ = ['Common'] 8 9import math 10from typing import List, Optional, Union 11from iqrfpy.enums.commands import * 12from iqrfpy.enums.message_types import * 13from iqrfpy.enums.peripherals import * 14from iqrfpy.exceptions import InvalidPeripheralValueError, InvalidPeripheralCommandValueError, \ 15 JsonMsgidMissingError, JsonMTypeMissingError, JsonNadrMissingError, JsonHwpidMissingError, JsonRCodeMissingError, \ 16 JsonDpaValueMissingError, JsonResultMissingError, JsonStatusMissingError, JsonGenericDataMissingError, \ 17 UnsupportedMessageTypeError, UnsupportedPeripheralError, UnsupportedPeripheralCommandError 18import iqrfpy.utils.dpa as dpa_constants 19 20 21class Common: 22 """Common class provides auxiliary methods for handling DPA and Daemon API JSON messages.""" 23 24 # DPA 25 26 @staticmethod 27 def serialize_to_dpa(nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int = 0xFFFF, 28 pdata: Optional[List[int]] = None, mutable: bool = False): 29 """Serialize request parameters into DPA packet. 30 31 Args: 32 nadr (int): Device address 33 pnum (Union[Peripheral, int]): Peripheral. 34 pcmd (Union[Command, int]): Peripheral command. 35 hwpid (int): Hardware profile ID. Defaults to 65535 36 pdata (List[int], optional): Request data. 37 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 38 Defaults to False. 39 40 Returns: 41 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 42 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 43 """ 44 dpa: List[int] = [nadr, 0, pnum, pcmd, hwpid & 0xFF, (hwpid >> 8) & 0xFF] 45 if pdata is not None: 46 dpa.extend(pdata) 47 if mutable: 48 return bytearray(dpa) 49 return bytes(dpa) 50 51 @staticmethod 52 def hwpid_from_dpa(high: int, low: int) -> int: 53 """Convert DPA HWPID bytes to a single 16bit unsigned integer. 54 55 Args: 56 high (int): HWPID high byte 57 low (int): HWPID low byte 58 Returns: 59 :obj:`int`: 16bit unsigned integer HWPID value 60 Raises: 61 ValueError: Raised if input values are not between 0 and 255 62 """ 63 if high > dpa_constants.BYTE_MAX or low > dpa_constants.BYTE_MAX: 64 raise ValueError('Argument value exceeds maximum allowed value of 255.') 65 if high < dpa_constants.BYTE_MIN or low < dpa_constants.BYTE_MIN: 66 raise ValueError('Negative argument values are not allowed.') 67 return (high << 8) + low 68 69 @staticmethod 70 def pnum_from_dpa(pnum: int) -> Union[Peripheral, int]: 71 """Return peripheral enum value based on DPA peripheral data byte. 72 73 Args: 74 pnum (int): Peripheral number data byte 75 Returns: 76 :obj:`Peripheral` or :obj:`int`: Peripheral enum member 77 Raises: 78 InvalidPeripheralValueError: Raised if pnum value is not between 0 and 255 79 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 80 peripheral enum 81 """ 82 if pnum < 0 or pnum > 255: 83 raise InvalidPeripheralValueError('Peripheral value out of range 0-255.') 84 if pnum in EmbedPeripherals: 85 return EmbedPeripherals(pnum) 86 if pnum in Standards: 87 return Standards(pnum) 88 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 89 return pnum 90 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 91 92 @staticmethod 93 def request_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 94 """Return request command based on DPA peripheral and command data byte. 95 96 Args: 97 pnum (Union[Peripheral, int]): Peripheral enum member 98 pcmd (int): Command data byte value 99 Returns: 100 :obj:`Command` or :obj:`int`: Request command enum value 101 Raises: 102 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 103 between 0 and 127 104 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 105 peripheral enum 106 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 107 peripheral command enum 108 """ 109 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 110 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 111 if pcmd > dpa_constants.REQUEST_PCMD_MAX: 112 raise InvalidPeripheralCommandValueError('Peripheral command value exceeds maximum allowed value of 127.') 113 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 114 return pcmd 115 commands = None 116 match pnum: 117 case EmbedPeripherals.COORDINATOR: 118 commands = CoordinatorRequestCommands 119 case EmbedPeripherals.NODE: 120 commands = NodeRequestCommands 121 case EmbedPeripherals.OS: 122 commands = OSRequestCommands 123 case EmbedPeripherals.EEPROM: 124 commands = EEPROMRequestCommands 125 case EmbedPeripherals.EEEPROM: 126 commands = EEEPROMRequestCommands 127 case EmbedPeripherals.RAM: 128 commands = RAMRequestCommands 129 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 130 commands = LEDRequestCommands 131 case EmbedPeripherals.IO: 132 commands = IORequestCommands 133 case EmbedPeripherals.THERMOMETER: 134 commands = ThermometerRequestCommands 135 case EmbedPeripherals.UART: 136 commands = UartRequestCommands 137 case EmbedPeripherals.FRC: 138 commands = FrcRequestCommands 139 case EmbedPeripherals.EXPLORATION: 140 commands = ExplorationRequestCommands 141 case Standards.DALI: 142 commands = DALIRequestCommands 143 case Standards.BINARY_OUTPUT: 144 commands = BinaryOutputRequestCommands 145 case Standards.SENSOR: 146 commands = SensorRequestCommands 147 case Standards.LIGHT: 148 commands = LightRequestCommands 149 case _: 150 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 151 152 if commands is not None and pcmd in commands: 153 return commands(pcmd) 154 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.') 155 156 @staticmethod 157 def response_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 158 """Return response command based on DPA peripheral and command data byte. 159 160 Args: 161 pnum (Union[Peripheral, int]): Peripheral enum member 162 pcmd (int): Command data byte value 163 Returns: 164 :obj:`Command` or :obj:`int`: Response command enum member 165 Raises: 166 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 167 between 128 and 255 168 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 169 peripheral enum 170 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 171 peripheral command enum 172 """ 173 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 174 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 175 if pcmd <= dpa_constants.REQUEST_PCMD_MAX or pcmd > dpa_constants.RESPONSE_PCMD_MAX: 176 raise InvalidPeripheralCommandValueError('Response peripheral command should be value between 128 and 255.') 177 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 178 return pcmd 179 commands = None 180 match pnum: 181 case EmbedPeripherals.COORDINATOR: 182 commands = CoordinatorResponseCommands 183 case EmbedPeripherals.NODE: 184 commands = NodeResponseCommands 185 case EmbedPeripherals.OS: 186 commands = OSResponseCommands 187 case EmbedPeripherals.EEPROM: 188 commands = EEPROMResponseCommands 189 case EmbedPeripherals.EEEPROM: 190 commands = EEEPROMResponseCommands 191 case EmbedPeripherals.RAM: 192 commands = RAMResponseCommands 193 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 194 commands = LEDResponseCommands 195 case EmbedPeripherals.IO: 196 commands = IOResponseCommands 197 case EmbedPeripherals.THERMOMETER: 198 commands = ThermometerResponseCommands 199 case EmbedPeripherals.UART: 200 commands = UartResponseCommands 201 case EmbedPeripherals.FRC: 202 commands = FrcResponseCommands 203 case EmbedPeripherals.EXPLORATION: 204 commands = ExplorationResponseCommands 205 case Standards.DALI: 206 commands = DALIResponseCommands 207 case Standards.BINARY_OUTPUT: 208 commands = BinaryOutputResponseCommands 209 case Standards.SENSOR: 210 commands = SensorResponseCommands 211 case Standards.LIGHT: 212 commands = LightResponseCommands 213 case _: 214 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 215 216 if commands is not None and pcmd in commands: 217 return commands(pcmd) 218 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.') 219 220 @staticmethod 221 def pdata_from_dpa(dpa: bytes) -> Union[List[int], None]: 222 """Return PDATA from DPA response bytes. 223 224 Args: 225 dpa (bytes): DPA response message 226 Returns: 227 :obj:`list` of :obj:`int` or :obj:`None`: PDATA integer list or None of there are no PDATA 228 """ 229 if len(dpa) > 8: 230 return list(dpa[8:]) 231 return None 232 233 @staticmethod 234 def parse_dpa_into_members(dpa: bytes): 235 """Parse DPA response into response members. 236 237 Args: 238 dpa (bytes): DPA response. 239 240 Returns: 241 :obj:`(int, int, int, int, int, int, Optional[List[int]])`: Response members 242 """ 243 nadr = dpa[dpa_constants.ResponsePacketMembers.NADR] 244 pnum = dpa[dpa_constants.ResponsePacketMembers.PNUM] 245 pcmd = dpa[dpa_constants.ResponsePacketMembers.PCMD] 246 hwpid = Common.hwpid_from_dpa( 247 dpa[dpa_constants.ResponsePacketMembers.HWPID_HI], 248 dpa[dpa_constants.ResponsePacketMembers.HWPID_LO] 249 ) 250 rcode = dpa[dpa_constants.ResponsePacketMembers.RCODE] 251 dpa_value = dpa[dpa_constants.ResponsePacketMembers.DPA_VALUE] 252 pdata = Common.pdata_from_dpa(dpa) 253 return nadr, pnum, pcmd, hwpid, rcode, dpa_value, pdata 254 255 # json 256 257 @staticmethod 258 def serialize_to_json(mtype: MessageType, msgid: str, nadr: int, hwpid: int = 0xFFFF, params: Optional[dict] = None, 259 dpa_rsp_time: Optional[float] = None): 260 """Serialize request parameters into JSON API request object. 261 262 Args: 263 mtype (MessageType): Message type. 264 msgid (str): Message ID. 265 nadr (int): Device address. 266 hwpid (int): Hardware profile ID. Defaults to 65535. 267 params (dict, optional): Request parameters. 268 dpa_rsp_time (float, optional): Request timeout. 269 270 Returns: 271 :obj:`dict`: JSON-serialized request 272 """ 273 params = params if params is not None else {} 274 json: dict = { 275 'mType': mtype.value, 276 'data': { 277 'msgId': msgid, 278 'req': { 279 'nAdr': nadr, 280 'hwpId': hwpid, 281 'param': params, 282 }, 283 'returnVerbose': True, 284 }, 285 } 286 if dpa_rsp_time is not None: 287 json['data']['timeout'] = math.ceil(dpa_rsp_time * 1000) 288 return json 289 290 @staticmethod 291 def msgid_from_json(json: dict) -> str: 292 """Return response msgid from Daemon API JSON response. 293 294 Args: 295 json (dict): JSON API response 296 Returns: 297 :obj:`str`: JSON API response message ID 298 Raises: 299 JsonMsgidMissingError: Raised if Daemon API response does not contain the msgId key 300 """ 301 try: 302 return json['data']['msgId'] 303 except KeyError as err: 304 raise JsonMsgidMissingError(f'Object does not contain property {str(err)}') from err 305 306 @staticmethod 307 def mtype_str_from_json(json: dict) -> str: 308 """Return message type from Daemon API JSON response. 309 310 Args: 311 json (dict): JSON API response 312 Returns: 313 :obj:`str`: JSON API response message type string 314 Raises: 315 JsonMTypeMissingError: Raised if Daemon API response does not contain the mType key 316 """ 317 try: 318 return json['mType'] 319 except KeyError as err: 320 raise JsonMTypeMissingError(f'Object does not contain property {str(err)}') from err 321 322 @staticmethod 323 def nadr_from_json(json: dict) -> int: 324 """Return response nadr from Daemon API JSON response. 325 326 Args: 327 json (dict): JSON API response 328 Returns: 329 :obj:`int`: JSON API response device address 330 Raises: 331 JsonNadrMissingError: Raised if Daemon API response does not contain the nAdr key 332 """ 333 try: 334 return json['data']['rsp']['nAdr'] 335 except KeyError as err: 336 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 337 338 @staticmethod 339 def pnum_from_json(json: dict) -> int: 340 """Return response pnum from Daemon API JSON response. 341 342 Args: 343 json (dict): JSON API response 344 Returns: 345 :obj:`int`: JSON API response peripheral number 346 Raises: 347 JsonNadrMissingError: Raised if Daemon API response does not contain the pnum key 348 """ 349 try: 350 return json['data']['rsp']['pnum'] 351 except KeyError as err: 352 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 353 354 @staticmethod 355 def pcmd_from_json(json: dict) -> int: 356 """Return response pcmd from Daemon API JSON response. 357 358 Args: 359 json (dict): JSON API response 360 Returns: 361 :obj:`int`: JSON API response peripheral command 362 Raises: 363 JsonNadrMissingError: Raised if Daemon API response does not contain the pcmd key 364 """ 365 try: 366 return json['data']['rsp']['pcmd'] 367 except KeyError as err: 368 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 369 370 @staticmethod 371 def hwpid_from_json(json: dict) -> int: 372 """Return response hwpid from Daemon API JSON response. 373 374 Args: 375 json (dict): JSON API response 376 Returns: 377 :obj:`int`: JSON API response hardware profile ID 378 Raises: 379 JsonHwpidMissingError: Raised if Daemon API response does not contain the hwpId key 380 """ 381 try: 382 return json['data']['rsp']['hwpId'] 383 except KeyError as err: 384 raise JsonHwpidMissingError(f'Object does not contain property {str(err)}') from err 385 386 @staticmethod 387 def rcode_from_json(json: dict) -> int: 388 """Return response rcode from Daemon API JSON response. 389 390 Args: 391 json (dict): JSON API response 392 Returns: 393 :obj:`int`: JSON API response DPA rcode 394 Raises: 395 JsonRCodeMissingError: Raised if Daemon API response does not contain the rcode key 396 """ 397 try: 398 return json['data']['rsp']['rCode'] 399 except KeyError as err: 400 raise JsonRCodeMissingError(f'Object does not contain property {str(err)}') from err 401 402 @staticmethod 403 def dpa_value_from_json(json: dict) -> int: 404 """Return response DPA value from Daemon API JSON response. 405 406 Args: 407 json (dict): JSON API response 408 Returns: 409 :obj:`int`: JSON API response dpa value 410 Raises: 411 JsonDpaValueMissingError: Raised if Daemon API response does not contain the dpaVal key 412 """ 413 try: 414 return json['data']['rsp']['dpaVal'] 415 except KeyError as err: 416 raise JsonDpaValueMissingError(f'Object does not contain property {str(err)}') from err 417 418 @staticmethod 419 def pdata_from_json(json: dict) -> Union[List[int], None]: 420 """Return pdata from Daemon API JSON response if available. 421 422 Args: 423 json (dict): JSON API response 424 Returns: 425 :obj:`Union[List[int], None]`: JSON API response pdata or None if there are no PDATA 426 """ 427 pdata = None 428 try: 429 raw = json['data']['raw'] 430 response = None 431 if isinstance(raw, list): 432 response = raw[0]['response'] 433 elif isinstance(raw, dict): 434 response = raw['response'] 435 response = response.split('.') 436 if len(response) > 8: 437 pdata = [int(x, 16) for x in response[8:]] 438 except KeyError: 439 pdata = None 440 finally: 441 return pdata 442 443 @staticmethod 444 def result_from_json(json: dict) -> dict: 445 """Return response result from Daemon API JSON response. 446 447 Args: 448 json (dict): JSON API response 449 Returns: 450 :obj:`dict`: JSON API response result object 451 Raises: 452 JsonResultMissingError: Raised if JSON API response does not contain the result key 453 """ 454 try: 455 return json['data']['rsp']['result'] 456 except KeyError as err: 457 raise JsonResultMissingError(f'Object does not contain property {str(err)}') from err 458 459 @staticmethod 460 def status_from_json(json: dict) -> int: 461 """Return response status from Daemon API JSON response. 462 463 Args: 464 json (dict): JSON API response 465 Returns: 466 :obj:`int`: JSON API response status code 467 Raises: 468 JsonStatusMissingError: Raised if JSON API response does not contain the status key 469 """ 470 try: 471 return json['data']['status'] 472 except KeyError as err: 473 raise JsonStatusMissingError(f'Object does not contain property {str(err)}') from err 474 475 @staticmethod 476 def generic_rdata_from_json(json: dict) -> str: 477 """Return generic response data from Daemon API JSON response. 478 479 Args: 480 json (dict): JSON API response 481 Returns: 482 :obj:`int`: JSON API response status code 483 Raises: 484 JsonStatusMissingError: Raised if JSON API response does not contain path to rData. 485 """ 486 try: 487 return json['data']['rsp']['rData'] 488 except KeyError as err: 489 raise JsonGenericDataMissingError(f'Object does not contain property {str(err)}') from err 490 491 @staticmethod 492 def parse_json_into_members(json: dict, omit_result: bool = False): 493 """Parse JSON response into response members. 494 495 Args: 496 json (dict): JSON API response 497 omit_result (bool): Do not parse result 498 Returns: 499 :obj:`(str, int, int, int, int, List[int], dict)`: Response members 500 """ 501 msgid = Common.msgid_from_json(json=json) 502 nadr = Common.nadr_from_json(json=json) 503 hwpid = Common.hwpid_from_json(json=json) 504 rcode = Common.rcode_from_json(json=json) 505 dpa_value = Common.dpa_value_from_json(json=json) 506 pdata = Common.pdata_from_json(json=json) 507 if omit_result: 508 result = None 509 else: 510 result = Common.result_from_json(json=json) if rcode == dpa_constants.ResponseCodes.OK else None 511 return msgid, nadr, hwpid, rcode, dpa_value, pdata, result 512 513 @staticmethod 514 def string_to_mtype(string: str) -> MessageType: 515 """Convert message type string to message type enum member value. 516 517 Args: 518 string (str): Message type string 519 Returns: 520 :obj:`MessageType`: Message type enum member 521 Raises: 522 UnsupportedMessageTypeError: Raised if message type is not recognized as a member of any message type enum 523 """ 524 messages = [GenericMessages, ExplorationMessages, CoordinatorMessages, NodeMessages, OSMessages, EEPROMMessages, 525 EEEPROMMessages, RAMMessages, LEDRMessages, LEDGMessages, IOMessages, ThermometerMessages, 526 UartMessages, FrcMessages, DALIMessages, BinaryOutputMessages, SensorMessages, LightMessages] 527 for item in messages: 528 if string in item: 529 return item(string) 530 raise UnsupportedMessageTypeError(f'Unknown or unsupported message type: {string}.') 531 532 # general 533 534 @staticmethod 535 def bitmap_to_nodes(bitmap: List[int], coordinator_shift: bool = False) -> List[int]: 536 """Convert node bitmap to list of nodes. 537 538 Args: 539 bitmap (List[int]): Node bitmap represented by list of integers 540 coordinator_shift (bool): Bitmap contains dummy coordinator value 541 Returns: 542 :obj:`list` of :obj:`int`: List of node addresses from bitmap 543 """ 544 nodes = [] 545 start = 0 if not coordinator_shift else 1 546 for i in range(start, len(bitmap * 8)): 547 if bitmap[int(i / 8)] & (1 << (i % 8)): 548 nodes.append(i) 549 return nodes 550 551 @staticmethod 552 def nodes_to_bitmap(nodes: List[int]) -> List[int]: 553 """Convert list of nodes to node bitmap. 554 555 Args: 556 nodes (List[int]): List of node addresses 557 Returns: 558 :obj:`list` of :obj:`int`: Nodes bitmap represented by list of 30 integers 559 """ 560 bitmap = [0] * 30 561 for node in nodes: 562 bitmap[math.floor(node / 8)] |= (1 << (node % 8)) 563 return bitmap 564 565 @staticmethod 566 def bitmap_4byte_to_indexes(bitmap: List[int]) -> List[int]: 567 """Convert 4 byte bitmap to list of indexes. 568 569 Args: 570 bitmap (List[int]): Index bitmap. 571 572 Returns: 573 :obj:`List` of :obj:`int`: List of indexes. 574 """ 575 indexes = [0] * 32 576 for i in range(0, len(bitmap * 8)): 577 if bitmap[int(i / 8)] & (1 << (i % 8)): 578 indexes[i] = 1 579 return indexes 580 581 @staticmethod 582 def indexes_to_4byte_bitmap(values: List[int]) -> List[int]: 583 """Convert list of indexes to a 4 byte bitmap. 584 585 Args: 586 values (List[int]): List of indexes 587 Returns: 588 :obj:`list` of :obj:`int`: Index bitmap represented by list of 4 integers 589 """ 590 bitmap = [0] * 4 591 for value in values: 592 bitmap[math.floor(value / 8)] |= (1 << (value % 8)) 593 return bitmap 594 595 @staticmethod 596 def is_hex_string(string: str) -> bool: 597 """Check if string contains only hexadecimal characters. 598 599 Args: 600 string (str): Input string 601 Returns: 602 :obj:`bool`: True if string contains only hexadecimal characters, False otherwise 603 """ 604 if len(string) == 0: 605 return False 606 return not set(string) - set('0123456789abcdefABCDEF') 607 608 @staticmethod 609 def hex_string_to_list(string: str) -> List[int]: 610 """Convert hexadecimal string to list of unsigned integers. 611 612 Args: 613 string (str): Hexadecimal string 614 Returns: 615 :obj:`list` of :obj:`int`: List of integers from hexadecimal string 616 Raises: 617 ValueError: Raised if string is of uneven length or contains non-hexadecimal characters 618 """ 619 if not len(string) % 2 == 0: 620 raise ValueError('Argument should be even length.') 621 if not Common.is_hex_string(string): 622 raise ValueError('Argument is not a hexadecimal string.') 623 return [int(string[i:i + 2], base=16) for i in range(0, len(string), 2)] 624 625 @staticmethod 626 def list_to_hex_string(values: List[int], separator: str = ' ', uppercase: bool = True) -> str: 627 """Convert list of unsigned integers to hexadecimal string. 628 629 Args: 630 values (List[int]): List of unsigned integers 631 separator (str, optional): Separator for byte string. Defaults to empty string. 632 uppercase (bool, optional): Output hex string uppercase. Defaults to False. 633 634 Returns: 635 :obj:`string`: Hexadecimal string representation of the list 636 """ 637 hex_format = 'X' if uppercase else 'x' 638 return separator.join(f'{value:02{hex_format}}' for value in values) 639 640 @staticmethod 641 def peripheral_list_to_bitmap(values: List[int]) -> List[int]: 642 """Convert list of peripheral numbers into a bitmap of peripherals. 643 644 For example, the coordinator peripheral would be represented by the 1st bit in bitmap. 645 646 Args: 647 values (List[int]): List of peripheral numbers 648 Returns: 649 :obj:`list` of :obj:`int`: Peripheral bitmap represented by list of 4 integers 650 """ 651 bitmap = [0 for _ in range(32)] 652 for value in values: 653 bitmap[value] = 1 654 byte_list = [] 655 for bits in [bitmap[i:i + 8] for i in range(0, len(bitmap), 8)]: 656 bits.reverse() 657 byte = 0 658 for bit in bits: 659 byte = (byte << 1) | bit 660 byte_list.append(byte) 661 return byte_list 662 663 @staticmethod 664 def values_in_byte_range(values: List[int]) -> bool: 665 """Check if list elements are within unsigned integer byte range. 666 667 Args: 668 values (List[int]): Input data 669 Returns: 670 :obj:`bool`: True if values are in range, False otherwise 671 """ 672 return len([value for value in values if value < 0 or value > 255]) == 0 673 674 @staticmethod 675 def byte_complement(value: int): 676 """Convert unsigned 1B value into a signed 1B value. 677 678 Args: 679 value (int): Input unsigned 1B value 680 Returns: 681 :obj:`int`: Signed 1B value 682 Raises: 683 ValueError: Raised when value is not in unsigned 8bit range. 684 """ 685 if not 0 <= value <= 0xFF: 686 raise ValueError('Not an unsigned 1B value.') 687 if value < 0x80: 688 return value 689 return value - 0x100 690 691 @staticmethod 692 def word_complement(value: int) -> int: 693 """Convert unsigned 2B value into a signed 2B value. 694 695 Args: 696 value (int): Input unsigned 2B value 697 Returns: 698 :obj:`int`: Signed 2B value 699 Raises: 700 ValueError: Raised when value is not in unsigned 16bit range. 701 """ 702 if not 0 <= value <= 0xFFFF: 703 raise ValueError('Not an unsigned 2B value.') 704 if value < 0x8000: 705 return value 706 return value - 0x10000 707 708 @staticmethod 709 def bcd_to_decimal(byte: int) -> int: 710 """Convert a Binary-Coded Decimal (BCD) value to decimal integer. 711 712 Args: 713 byte (int): BCD value to convert. 714 715 Returns: 716 :obj:`int`: Decimal value encoded in BCD. 717 718 Raises: 719 ValueError: Raised when byte argument is not a valid BCD value. 720 721 Examples: 722 >>> Common.bcd_to_decimal(0x42) 723 42 724 """ 725 high = (byte >> 4) & 0x0F 726 low = byte & 0x0F 727 if high > 9 or low > 9: 728 raise ValueError('Not a BCD value.') 729 return high * 10 + low 730 731 @staticmethod 732 def dpa_build_date_to_str(par1: int, par2: int) -> str: 733 """Convert DPA build date in BCD format to human-readable string. 734 735 Outputs date string in the following format: DD.MM.YYYY. 736 737 Args: 738 par1 (int): BCD-encoded day of DPA build date. 739 par2 (int): BCD-encoded month and year of DPA build date. 740 741 Returns: 742 str: Human-readable DPA build date string. 743 """ 744 try: 745 day = Common.bcd_to_decimal(par1) 746 month = par2 & 0x0F 747 year = 2010 + ((par2 >> 4) & 0x0F) 748 return f'{day}.{month}.{year}' 749 except ValueError as e: 750 raise ValueError(f'Failed to convert DPA build date to string: {str(e)}') from e 751 752 @staticmethod 753 def dpa_version_to_str(dpa_version: int) -> str: 754 """Convert DPA version in BCD format to human-readable version string. 755 756 Args: 757 dpa_version (int): DPA version encoded in BCD format. 758 759 Returns: 760 str: Human-readable DPA version string. 761 762 Raises: 763 ValueError: Raised when dpa_version is not a valid BCD value. 764 """ 765 try: 766 major = Common.bcd_to_decimal(int(dpa_version / 256)) 767 minor = dpa_version & 0x00FF 768 return f'{major}.{minor:02X}' 769 except ValueError as e: 770 raise ValueError(f'Failed to convert version to string: {str(e)}') from e 771 772 @staticmethod 773 def fletcher_checksum(init: int, data: List[int]) -> int: 774 """Calculate one's complement Fletcher checksum from list of bytes. 775 776 See https://doc.iqrf.org/DpaTechGuide/pages/FletcherCSharp.html 777 778 Args: 779 init (int): Initial value. 780 data (List[int]): List of bytes to calculate checksum from. 781 782 Returns: 783 :obj:`int`: 16bit Fletcher checksum integer value. 784 """ 785 checksum = init 786 for byte in data: 787 low = checksum & 0xFF 788 low += byte 789 if (low & 0x100) != 0: 790 low += 1 791 high = checksum >> 8 792 high += low & 0xFF 793 if (high & 0x100) != 0: 794 high += 1 795 checksum = (low & 0xFF) | (high & 0xFF) << 8 796 return checksum
22class Common: 23 """Common class provides auxiliary methods for handling DPA and Daemon API JSON messages.""" 24 25 # DPA 26 27 @staticmethod 28 def serialize_to_dpa(nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int = 0xFFFF, 29 pdata: Optional[List[int]] = None, mutable: bool = False): 30 """Serialize request parameters into DPA packet. 31 32 Args: 33 nadr (int): Device address 34 pnum (Union[Peripheral, int]): Peripheral. 35 pcmd (Union[Command, int]): Peripheral command. 36 hwpid (int): Hardware profile ID. Defaults to 65535 37 pdata (List[int], optional): Request data. 38 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 39 Defaults to False. 40 41 Returns: 42 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 43 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 44 """ 45 dpa: List[int] = [nadr, 0, pnum, pcmd, hwpid & 0xFF, (hwpid >> 8) & 0xFF] 46 if pdata is not None: 47 dpa.extend(pdata) 48 if mutable: 49 return bytearray(dpa) 50 return bytes(dpa) 51 52 @staticmethod 53 def hwpid_from_dpa(high: int, low: int) -> int: 54 """Convert DPA HWPID bytes to a single 16bit unsigned integer. 55 56 Args: 57 high (int): HWPID high byte 58 low (int): HWPID low byte 59 Returns: 60 :obj:`int`: 16bit unsigned integer HWPID value 61 Raises: 62 ValueError: Raised if input values are not between 0 and 255 63 """ 64 if high > dpa_constants.BYTE_MAX or low > dpa_constants.BYTE_MAX: 65 raise ValueError('Argument value exceeds maximum allowed value of 255.') 66 if high < dpa_constants.BYTE_MIN or low < dpa_constants.BYTE_MIN: 67 raise ValueError('Negative argument values are not allowed.') 68 return (high << 8) + low 69 70 @staticmethod 71 def pnum_from_dpa(pnum: int) -> Union[Peripheral, int]: 72 """Return peripheral enum value based on DPA peripheral data byte. 73 74 Args: 75 pnum (int): Peripheral number data byte 76 Returns: 77 :obj:`Peripheral` or :obj:`int`: Peripheral enum member 78 Raises: 79 InvalidPeripheralValueError: Raised if pnum value is not between 0 and 255 80 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 81 peripheral enum 82 """ 83 if pnum < 0 or pnum > 255: 84 raise InvalidPeripheralValueError('Peripheral value out of range 0-255.') 85 if pnum in EmbedPeripherals: 86 return EmbedPeripherals(pnum) 87 if pnum in Standards: 88 return Standards(pnum) 89 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 90 return pnum 91 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 92 93 @staticmethod 94 def request_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 95 """Return request command based on DPA peripheral and command data byte. 96 97 Args: 98 pnum (Union[Peripheral, int]): Peripheral enum member 99 pcmd (int): Command data byte value 100 Returns: 101 :obj:`Command` or :obj:`int`: Request command enum value 102 Raises: 103 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 104 between 0 and 127 105 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 106 peripheral enum 107 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 108 peripheral command enum 109 """ 110 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 111 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 112 if pcmd > dpa_constants.REQUEST_PCMD_MAX: 113 raise InvalidPeripheralCommandValueError('Peripheral command value exceeds maximum allowed value of 127.') 114 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 115 return pcmd 116 commands = None 117 match pnum: 118 case EmbedPeripherals.COORDINATOR: 119 commands = CoordinatorRequestCommands 120 case EmbedPeripherals.NODE: 121 commands = NodeRequestCommands 122 case EmbedPeripherals.OS: 123 commands = OSRequestCommands 124 case EmbedPeripherals.EEPROM: 125 commands = EEPROMRequestCommands 126 case EmbedPeripherals.EEEPROM: 127 commands = EEEPROMRequestCommands 128 case EmbedPeripherals.RAM: 129 commands = RAMRequestCommands 130 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 131 commands = LEDRequestCommands 132 case EmbedPeripherals.IO: 133 commands = IORequestCommands 134 case EmbedPeripherals.THERMOMETER: 135 commands = ThermometerRequestCommands 136 case EmbedPeripherals.UART: 137 commands = UartRequestCommands 138 case EmbedPeripherals.FRC: 139 commands = FrcRequestCommands 140 case EmbedPeripherals.EXPLORATION: 141 commands = ExplorationRequestCommands 142 case Standards.DALI: 143 commands = DALIRequestCommands 144 case Standards.BINARY_OUTPUT: 145 commands = BinaryOutputRequestCommands 146 case Standards.SENSOR: 147 commands = SensorRequestCommands 148 case Standards.LIGHT: 149 commands = LightRequestCommands 150 case _: 151 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 152 153 if commands is not None and pcmd in commands: 154 return commands(pcmd) 155 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.') 156 157 @staticmethod 158 def response_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 159 """Return response command based on DPA peripheral and command data byte. 160 161 Args: 162 pnum (Union[Peripheral, int]): Peripheral enum member 163 pcmd (int): Command data byte value 164 Returns: 165 :obj:`Command` or :obj:`int`: Response command enum member 166 Raises: 167 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 168 between 128 and 255 169 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 170 peripheral enum 171 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 172 peripheral command enum 173 """ 174 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 175 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 176 if pcmd <= dpa_constants.REQUEST_PCMD_MAX or pcmd > dpa_constants.RESPONSE_PCMD_MAX: 177 raise InvalidPeripheralCommandValueError('Response peripheral command should be value between 128 and 255.') 178 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 179 return pcmd 180 commands = None 181 match pnum: 182 case EmbedPeripherals.COORDINATOR: 183 commands = CoordinatorResponseCommands 184 case EmbedPeripherals.NODE: 185 commands = NodeResponseCommands 186 case EmbedPeripherals.OS: 187 commands = OSResponseCommands 188 case EmbedPeripherals.EEPROM: 189 commands = EEPROMResponseCommands 190 case EmbedPeripherals.EEEPROM: 191 commands = EEEPROMResponseCommands 192 case EmbedPeripherals.RAM: 193 commands = RAMResponseCommands 194 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 195 commands = LEDResponseCommands 196 case EmbedPeripherals.IO: 197 commands = IOResponseCommands 198 case EmbedPeripherals.THERMOMETER: 199 commands = ThermometerResponseCommands 200 case EmbedPeripherals.UART: 201 commands = UartResponseCommands 202 case EmbedPeripherals.FRC: 203 commands = FrcResponseCommands 204 case EmbedPeripherals.EXPLORATION: 205 commands = ExplorationResponseCommands 206 case Standards.DALI: 207 commands = DALIResponseCommands 208 case Standards.BINARY_OUTPUT: 209 commands = BinaryOutputResponseCommands 210 case Standards.SENSOR: 211 commands = SensorResponseCommands 212 case Standards.LIGHT: 213 commands = LightResponseCommands 214 case _: 215 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 216 217 if commands is not None and pcmd in commands: 218 return commands(pcmd) 219 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.') 220 221 @staticmethod 222 def pdata_from_dpa(dpa: bytes) -> Union[List[int], None]: 223 """Return PDATA from DPA response bytes. 224 225 Args: 226 dpa (bytes): DPA response message 227 Returns: 228 :obj:`list` of :obj:`int` or :obj:`None`: PDATA integer list or None of there are no PDATA 229 """ 230 if len(dpa) > 8: 231 return list(dpa[8:]) 232 return None 233 234 @staticmethod 235 def parse_dpa_into_members(dpa: bytes): 236 """Parse DPA response into response members. 237 238 Args: 239 dpa (bytes): DPA response. 240 241 Returns: 242 :obj:`(int, int, int, int, int, int, Optional[List[int]])`: Response members 243 """ 244 nadr = dpa[dpa_constants.ResponsePacketMembers.NADR] 245 pnum = dpa[dpa_constants.ResponsePacketMembers.PNUM] 246 pcmd = dpa[dpa_constants.ResponsePacketMembers.PCMD] 247 hwpid = Common.hwpid_from_dpa( 248 dpa[dpa_constants.ResponsePacketMembers.HWPID_HI], 249 dpa[dpa_constants.ResponsePacketMembers.HWPID_LO] 250 ) 251 rcode = dpa[dpa_constants.ResponsePacketMembers.RCODE] 252 dpa_value = dpa[dpa_constants.ResponsePacketMembers.DPA_VALUE] 253 pdata = Common.pdata_from_dpa(dpa) 254 return nadr, pnum, pcmd, hwpid, rcode, dpa_value, pdata 255 256 # json 257 258 @staticmethod 259 def serialize_to_json(mtype: MessageType, msgid: str, nadr: int, hwpid: int = 0xFFFF, params: Optional[dict] = None, 260 dpa_rsp_time: Optional[float] = None): 261 """Serialize request parameters into JSON API request object. 262 263 Args: 264 mtype (MessageType): Message type. 265 msgid (str): Message ID. 266 nadr (int): Device address. 267 hwpid (int): Hardware profile ID. Defaults to 65535. 268 params (dict, optional): Request parameters. 269 dpa_rsp_time (float, optional): Request timeout. 270 271 Returns: 272 :obj:`dict`: JSON-serialized request 273 """ 274 params = params if params is not None else {} 275 json: dict = { 276 'mType': mtype.value, 277 'data': { 278 'msgId': msgid, 279 'req': { 280 'nAdr': nadr, 281 'hwpId': hwpid, 282 'param': params, 283 }, 284 'returnVerbose': True, 285 }, 286 } 287 if dpa_rsp_time is not None: 288 json['data']['timeout'] = math.ceil(dpa_rsp_time * 1000) 289 return json 290 291 @staticmethod 292 def msgid_from_json(json: dict) -> str: 293 """Return response msgid from Daemon API JSON response. 294 295 Args: 296 json (dict): JSON API response 297 Returns: 298 :obj:`str`: JSON API response message ID 299 Raises: 300 JsonMsgidMissingError: Raised if Daemon API response does not contain the msgId key 301 """ 302 try: 303 return json['data']['msgId'] 304 except KeyError as err: 305 raise JsonMsgidMissingError(f'Object does not contain property {str(err)}') from err 306 307 @staticmethod 308 def mtype_str_from_json(json: dict) -> str: 309 """Return message type from Daemon API JSON response. 310 311 Args: 312 json (dict): JSON API response 313 Returns: 314 :obj:`str`: JSON API response message type string 315 Raises: 316 JsonMTypeMissingError: Raised if Daemon API response does not contain the mType key 317 """ 318 try: 319 return json['mType'] 320 except KeyError as err: 321 raise JsonMTypeMissingError(f'Object does not contain property {str(err)}') from err 322 323 @staticmethod 324 def nadr_from_json(json: dict) -> int: 325 """Return response nadr from Daemon API JSON response. 326 327 Args: 328 json (dict): JSON API response 329 Returns: 330 :obj:`int`: JSON API response device address 331 Raises: 332 JsonNadrMissingError: Raised if Daemon API response does not contain the nAdr key 333 """ 334 try: 335 return json['data']['rsp']['nAdr'] 336 except KeyError as err: 337 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 338 339 @staticmethod 340 def pnum_from_json(json: dict) -> int: 341 """Return response pnum from Daemon API JSON response. 342 343 Args: 344 json (dict): JSON API response 345 Returns: 346 :obj:`int`: JSON API response peripheral number 347 Raises: 348 JsonNadrMissingError: Raised if Daemon API response does not contain the pnum key 349 """ 350 try: 351 return json['data']['rsp']['pnum'] 352 except KeyError as err: 353 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 354 355 @staticmethod 356 def pcmd_from_json(json: dict) -> int: 357 """Return response pcmd from Daemon API JSON response. 358 359 Args: 360 json (dict): JSON API response 361 Returns: 362 :obj:`int`: JSON API response peripheral command 363 Raises: 364 JsonNadrMissingError: Raised if Daemon API response does not contain the pcmd key 365 """ 366 try: 367 return json['data']['rsp']['pcmd'] 368 except KeyError as err: 369 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err 370 371 @staticmethod 372 def hwpid_from_json(json: dict) -> int: 373 """Return response hwpid from Daemon API JSON response. 374 375 Args: 376 json (dict): JSON API response 377 Returns: 378 :obj:`int`: JSON API response hardware profile ID 379 Raises: 380 JsonHwpidMissingError: Raised if Daemon API response does not contain the hwpId key 381 """ 382 try: 383 return json['data']['rsp']['hwpId'] 384 except KeyError as err: 385 raise JsonHwpidMissingError(f'Object does not contain property {str(err)}') from err 386 387 @staticmethod 388 def rcode_from_json(json: dict) -> int: 389 """Return response rcode from Daemon API JSON response. 390 391 Args: 392 json (dict): JSON API response 393 Returns: 394 :obj:`int`: JSON API response DPA rcode 395 Raises: 396 JsonRCodeMissingError: Raised if Daemon API response does not contain the rcode key 397 """ 398 try: 399 return json['data']['rsp']['rCode'] 400 except KeyError as err: 401 raise JsonRCodeMissingError(f'Object does not contain property {str(err)}') from err 402 403 @staticmethod 404 def dpa_value_from_json(json: dict) -> int: 405 """Return response DPA value from Daemon API JSON response. 406 407 Args: 408 json (dict): JSON API response 409 Returns: 410 :obj:`int`: JSON API response dpa value 411 Raises: 412 JsonDpaValueMissingError: Raised if Daemon API response does not contain the dpaVal key 413 """ 414 try: 415 return json['data']['rsp']['dpaVal'] 416 except KeyError as err: 417 raise JsonDpaValueMissingError(f'Object does not contain property {str(err)}') from err 418 419 @staticmethod 420 def pdata_from_json(json: dict) -> Union[List[int], None]: 421 """Return pdata from Daemon API JSON response if available. 422 423 Args: 424 json (dict): JSON API response 425 Returns: 426 :obj:`Union[List[int], None]`: JSON API response pdata or None if there are no PDATA 427 """ 428 pdata = None 429 try: 430 raw = json['data']['raw'] 431 response = None 432 if isinstance(raw, list): 433 response = raw[0]['response'] 434 elif isinstance(raw, dict): 435 response = raw['response'] 436 response = response.split('.') 437 if len(response) > 8: 438 pdata = [int(x, 16) for x in response[8:]] 439 except KeyError: 440 pdata = None 441 finally: 442 return pdata 443 444 @staticmethod 445 def result_from_json(json: dict) -> dict: 446 """Return response result from Daemon API JSON response. 447 448 Args: 449 json (dict): JSON API response 450 Returns: 451 :obj:`dict`: JSON API response result object 452 Raises: 453 JsonResultMissingError: Raised if JSON API response does not contain the result key 454 """ 455 try: 456 return json['data']['rsp']['result'] 457 except KeyError as err: 458 raise JsonResultMissingError(f'Object does not contain property {str(err)}') from err 459 460 @staticmethod 461 def status_from_json(json: dict) -> int: 462 """Return response status from Daemon API JSON response. 463 464 Args: 465 json (dict): JSON API response 466 Returns: 467 :obj:`int`: JSON API response status code 468 Raises: 469 JsonStatusMissingError: Raised if JSON API response does not contain the status key 470 """ 471 try: 472 return json['data']['status'] 473 except KeyError as err: 474 raise JsonStatusMissingError(f'Object does not contain property {str(err)}') from err 475 476 @staticmethod 477 def generic_rdata_from_json(json: dict) -> str: 478 """Return generic response data from Daemon API JSON response. 479 480 Args: 481 json (dict): JSON API response 482 Returns: 483 :obj:`int`: JSON API response status code 484 Raises: 485 JsonStatusMissingError: Raised if JSON API response does not contain path to rData. 486 """ 487 try: 488 return json['data']['rsp']['rData'] 489 except KeyError as err: 490 raise JsonGenericDataMissingError(f'Object does not contain property {str(err)}') from err 491 492 @staticmethod 493 def parse_json_into_members(json: dict, omit_result: bool = False): 494 """Parse JSON response into response members. 495 496 Args: 497 json (dict): JSON API response 498 omit_result (bool): Do not parse result 499 Returns: 500 :obj:`(str, int, int, int, int, List[int], dict)`: Response members 501 """ 502 msgid = Common.msgid_from_json(json=json) 503 nadr = Common.nadr_from_json(json=json) 504 hwpid = Common.hwpid_from_json(json=json) 505 rcode = Common.rcode_from_json(json=json) 506 dpa_value = Common.dpa_value_from_json(json=json) 507 pdata = Common.pdata_from_json(json=json) 508 if omit_result: 509 result = None 510 else: 511 result = Common.result_from_json(json=json) if rcode == dpa_constants.ResponseCodes.OK else None 512 return msgid, nadr, hwpid, rcode, dpa_value, pdata, result 513 514 @staticmethod 515 def string_to_mtype(string: str) -> MessageType: 516 """Convert message type string to message type enum member value. 517 518 Args: 519 string (str): Message type string 520 Returns: 521 :obj:`MessageType`: Message type enum member 522 Raises: 523 UnsupportedMessageTypeError: Raised if message type is not recognized as a member of any message type enum 524 """ 525 messages = [GenericMessages, ExplorationMessages, CoordinatorMessages, NodeMessages, OSMessages, EEPROMMessages, 526 EEEPROMMessages, RAMMessages, LEDRMessages, LEDGMessages, IOMessages, ThermometerMessages, 527 UartMessages, FrcMessages, DALIMessages, BinaryOutputMessages, SensorMessages, LightMessages] 528 for item in messages: 529 if string in item: 530 return item(string) 531 raise UnsupportedMessageTypeError(f'Unknown or unsupported message type: {string}.') 532 533 # general 534 535 @staticmethod 536 def bitmap_to_nodes(bitmap: List[int], coordinator_shift: bool = False) -> List[int]: 537 """Convert node bitmap to list of nodes. 538 539 Args: 540 bitmap (List[int]): Node bitmap represented by list of integers 541 coordinator_shift (bool): Bitmap contains dummy coordinator value 542 Returns: 543 :obj:`list` of :obj:`int`: List of node addresses from bitmap 544 """ 545 nodes = [] 546 start = 0 if not coordinator_shift else 1 547 for i in range(start, len(bitmap * 8)): 548 if bitmap[int(i / 8)] & (1 << (i % 8)): 549 nodes.append(i) 550 return nodes 551 552 @staticmethod 553 def nodes_to_bitmap(nodes: List[int]) -> List[int]: 554 """Convert list of nodes to node bitmap. 555 556 Args: 557 nodes (List[int]): List of node addresses 558 Returns: 559 :obj:`list` of :obj:`int`: Nodes bitmap represented by list of 30 integers 560 """ 561 bitmap = [0] * 30 562 for node in nodes: 563 bitmap[math.floor(node / 8)] |= (1 << (node % 8)) 564 return bitmap 565 566 @staticmethod 567 def bitmap_4byte_to_indexes(bitmap: List[int]) -> List[int]: 568 """Convert 4 byte bitmap to list of indexes. 569 570 Args: 571 bitmap (List[int]): Index bitmap. 572 573 Returns: 574 :obj:`List` of :obj:`int`: List of indexes. 575 """ 576 indexes = [0] * 32 577 for i in range(0, len(bitmap * 8)): 578 if bitmap[int(i / 8)] & (1 << (i % 8)): 579 indexes[i] = 1 580 return indexes 581 582 @staticmethod 583 def indexes_to_4byte_bitmap(values: List[int]) -> List[int]: 584 """Convert list of indexes to a 4 byte bitmap. 585 586 Args: 587 values (List[int]): List of indexes 588 Returns: 589 :obj:`list` of :obj:`int`: Index bitmap represented by list of 4 integers 590 """ 591 bitmap = [0] * 4 592 for value in values: 593 bitmap[math.floor(value / 8)] |= (1 << (value % 8)) 594 return bitmap 595 596 @staticmethod 597 def is_hex_string(string: str) -> bool: 598 """Check if string contains only hexadecimal characters. 599 600 Args: 601 string (str): Input string 602 Returns: 603 :obj:`bool`: True if string contains only hexadecimal characters, False otherwise 604 """ 605 if len(string) == 0: 606 return False 607 return not set(string) - set('0123456789abcdefABCDEF') 608 609 @staticmethod 610 def hex_string_to_list(string: str) -> List[int]: 611 """Convert hexadecimal string to list of unsigned integers. 612 613 Args: 614 string (str): Hexadecimal string 615 Returns: 616 :obj:`list` of :obj:`int`: List of integers from hexadecimal string 617 Raises: 618 ValueError: Raised if string is of uneven length or contains non-hexadecimal characters 619 """ 620 if not len(string) % 2 == 0: 621 raise ValueError('Argument should be even length.') 622 if not Common.is_hex_string(string): 623 raise ValueError('Argument is not a hexadecimal string.') 624 return [int(string[i:i + 2], base=16) for i in range(0, len(string), 2)] 625 626 @staticmethod 627 def list_to_hex_string(values: List[int], separator: str = ' ', uppercase: bool = True) -> str: 628 """Convert list of unsigned integers to hexadecimal string. 629 630 Args: 631 values (List[int]): List of unsigned integers 632 separator (str, optional): Separator for byte string. Defaults to empty string. 633 uppercase (bool, optional): Output hex string uppercase. Defaults to False. 634 635 Returns: 636 :obj:`string`: Hexadecimal string representation of the list 637 """ 638 hex_format = 'X' if uppercase else 'x' 639 return separator.join(f'{value:02{hex_format}}' for value in values) 640 641 @staticmethod 642 def peripheral_list_to_bitmap(values: List[int]) -> List[int]: 643 """Convert list of peripheral numbers into a bitmap of peripherals. 644 645 For example, the coordinator peripheral would be represented by the 1st bit in bitmap. 646 647 Args: 648 values (List[int]): List of peripheral numbers 649 Returns: 650 :obj:`list` of :obj:`int`: Peripheral bitmap represented by list of 4 integers 651 """ 652 bitmap = [0 for _ in range(32)] 653 for value in values: 654 bitmap[value] = 1 655 byte_list = [] 656 for bits in [bitmap[i:i + 8] for i in range(0, len(bitmap), 8)]: 657 bits.reverse() 658 byte = 0 659 for bit in bits: 660 byte = (byte << 1) | bit 661 byte_list.append(byte) 662 return byte_list 663 664 @staticmethod 665 def values_in_byte_range(values: List[int]) -> bool: 666 """Check if list elements are within unsigned integer byte range. 667 668 Args: 669 values (List[int]): Input data 670 Returns: 671 :obj:`bool`: True if values are in range, False otherwise 672 """ 673 return len([value for value in values if value < 0 or value > 255]) == 0 674 675 @staticmethod 676 def byte_complement(value: int): 677 """Convert unsigned 1B value into a signed 1B value. 678 679 Args: 680 value (int): Input unsigned 1B value 681 Returns: 682 :obj:`int`: Signed 1B value 683 Raises: 684 ValueError: Raised when value is not in unsigned 8bit range. 685 """ 686 if not 0 <= value <= 0xFF: 687 raise ValueError('Not an unsigned 1B value.') 688 if value < 0x80: 689 return value 690 return value - 0x100 691 692 @staticmethod 693 def word_complement(value: int) -> int: 694 """Convert unsigned 2B value into a signed 2B value. 695 696 Args: 697 value (int): Input unsigned 2B value 698 Returns: 699 :obj:`int`: Signed 2B value 700 Raises: 701 ValueError: Raised when value is not in unsigned 16bit range. 702 """ 703 if not 0 <= value <= 0xFFFF: 704 raise ValueError('Not an unsigned 2B value.') 705 if value < 0x8000: 706 return value 707 return value - 0x10000 708 709 @staticmethod 710 def bcd_to_decimal(byte: int) -> int: 711 """Convert a Binary-Coded Decimal (BCD) value to decimal integer. 712 713 Args: 714 byte (int): BCD value to convert. 715 716 Returns: 717 :obj:`int`: Decimal value encoded in BCD. 718 719 Raises: 720 ValueError: Raised when byte argument is not a valid BCD value. 721 722 Examples: 723 >>> Common.bcd_to_decimal(0x42) 724 42 725 """ 726 high = (byte >> 4) & 0x0F 727 low = byte & 0x0F 728 if high > 9 or low > 9: 729 raise ValueError('Not a BCD value.') 730 return high * 10 + low 731 732 @staticmethod 733 def dpa_build_date_to_str(par1: int, par2: int) -> str: 734 """Convert DPA build date in BCD format to human-readable string. 735 736 Outputs date string in the following format: DD.MM.YYYY. 737 738 Args: 739 par1 (int): BCD-encoded day of DPA build date. 740 par2 (int): BCD-encoded month and year of DPA build date. 741 742 Returns: 743 str: Human-readable DPA build date string. 744 """ 745 try: 746 day = Common.bcd_to_decimal(par1) 747 month = par2 & 0x0F 748 year = 2010 + ((par2 >> 4) & 0x0F) 749 return f'{day}.{month}.{year}' 750 except ValueError as e: 751 raise ValueError(f'Failed to convert DPA build date to string: {str(e)}') from e 752 753 @staticmethod 754 def dpa_version_to_str(dpa_version: int) -> str: 755 """Convert DPA version in BCD format to human-readable version string. 756 757 Args: 758 dpa_version (int): DPA version encoded in BCD format. 759 760 Returns: 761 str: Human-readable DPA version string. 762 763 Raises: 764 ValueError: Raised when dpa_version is not a valid BCD value. 765 """ 766 try: 767 major = Common.bcd_to_decimal(int(dpa_version / 256)) 768 minor = dpa_version & 0x00FF 769 return f'{major}.{minor:02X}' 770 except ValueError as e: 771 raise ValueError(f'Failed to convert version to string: {str(e)}') from e 772 773 @staticmethod 774 def fletcher_checksum(init: int, data: List[int]) -> int: 775 """Calculate one's complement Fletcher checksum from list of bytes. 776 777 See https://doc.iqrf.org/DpaTechGuide/pages/FletcherCSharp.html 778 779 Args: 780 init (int): Initial value. 781 data (List[int]): List of bytes to calculate checksum from. 782 783 Returns: 784 :obj:`int`: 16bit Fletcher checksum integer value. 785 """ 786 checksum = init 787 for byte in data: 788 low = checksum & 0xFF 789 low += byte 790 if (low & 0x100) != 0: 791 low += 1 792 high = checksum >> 8 793 high += low & 0xFF 794 if (high & 0x100) != 0: 795 high += 1 796 checksum = (low & 0xFF) | (high & 0xFF) << 8 797 return checksum
Common class provides auxiliary methods for handling DPA and Daemon API JSON messages.
27 @staticmethod 28 def serialize_to_dpa(nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int = 0xFFFF, 29 pdata: Optional[List[int]] = None, mutable: bool = False): 30 """Serialize request parameters into DPA packet. 31 32 Args: 33 nadr (int): Device address 34 pnum (Union[Peripheral, int]): Peripheral. 35 pcmd (Union[Command, int]): Peripheral command. 36 hwpid (int): Hardware profile ID. Defaults to 65535 37 pdata (List[int], optional): Request data. 38 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 39 Defaults to False. 40 41 Returns: 42 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 43 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 44 """ 45 dpa: List[int] = [nadr, 0, pnum, pcmd, hwpid & 0xFF, (hwpid >> 8) & 0xFF] 46 if pdata is not None: 47 dpa.extend(pdata) 48 if mutable: 49 return bytearray(dpa) 50 return bytes(dpa)
Serialize request parameters into DPA packet.
Arguments:
- nadr (int): Device address
- pnum (Union[Peripheral, int]): Peripheral.
- pcmd (Union[Command, int]): Peripheral command.
- hwpid (int): Hardware profile ID. Defaults to 65535
- pdata (List[int], optional): Request data.
- mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. Defaults to False.
Returns:
bytes: Immutable byte representation of DPA request packet.
bytearray: Mutable byte representation of DPA request packet (if argument mutable is True).
52 @staticmethod 53 def hwpid_from_dpa(high: int, low: int) -> int: 54 """Convert DPA HWPID bytes to a single 16bit unsigned integer. 55 56 Args: 57 high (int): HWPID high byte 58 low (int): HWPID low byte 59 Returns: 60 :obj:`int`: 16bit unsigned integer HWPID value 61 Raises: 62 ValueError: Raised if input values are not between 0 and 255 63 """ 64 if high > dpa_constants.BYTE_MAX or low > dpa_constants.BYTE_MAX: 65 raise ValueError('Argument value exceeds maximum allowed value of 255.') 66 if high < dpa_constants.BYTE_MIN or low < dpa_constants.BYTE_MIN: 67 raise ValueError('Negative argument values are not allowed.') 68 return (high << 8) + low
Convert DPA HWPID bytes to a single 16bit unsigned integer.
Arguments:
- high (int): HWPID high byte
- low (int): HWPID low byte
Returns:
int: 16bit unsigned integer HWPID value
Raises:
- ValueError: Raised if input values are not between 0 and 255
70 @staticmethod 71 def pnum_from_dpa(pnum: int) -> Union[Peripheral, int]: 72 """Return peripheral enum value based on DPA peripheral data byte. 73 74 Args: 75 pnum (int): Peripheral number data byte 76 Returns: 77 :obj:`Peripheral` or :obj:`int`: Peripheral enum member 78 Raises: 79 InvalidPeripheralValueError: Raised if pnum value is not between 0 and 255 80 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 81 peripheral enum 82 """ 83 if pnum < 0 or pnum > 255: 84 raise InvalidPeripheralValueError('Peripheral value out of range 0-255.') 85 if pnum in EmbedPeripherals: 86 return EmbedPeripherals(pnum) 87 if pnum in Standards: 88 return Standards(pnum) 89 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 90 return pnum 91 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.')
Return peripheral enum value based on DPA peripheral data byte.
Arguments:
- pnum (int): Peripheral number data byte
Returns:
Peripheralorint: Peripheral enum member
Raises:
- InvalidPeripheralValueError: Raised if pnum value is not between 0 and 255
- UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any peripheral enum
93 @staticmethod 94 def request_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 95 """Return request command based on DPA peripheral and command data byte. 96 97 Args: 98 pnum (Union[Peripheral, int]): Peripheral enum member 99 pcmd (int): Command data byte value 100 Returns: 101 :obj:`Command` or :obj:`int`: Request command enum value 102 Raises: 103 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 104 between 0 and 127 105 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 106 peripheral enum 107 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 108 peripheral command enum 109 """ 110 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 111 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 112 if pcmd > dpa_constants.REQUEST_PCMD_MAX: 113 raise InvalidPeripheralCommandValueError('Peripheral command value exceeds maximum allowed value of 127.') 114 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 115 return pcmd 116 commands = None 117 match pnum: 118 case EmbedPeripherals.COORDINATOR: 119 commands = CoordinatorRequestCommands 120 case EmbedPeripherals.NODE: 121 commands = NodeRequestCommands 122 case EmbedPeripherals.OS: 123 commands = OSRequestCommands 124 case EmbedPeripherals.EEPROM: 125 commands = EEPROMRequestCommands 126 case EmbedPeripherals.EEEPROM: 127 commands = EEEPROMRequestCommands 128 case EmbedPeripherals.RAM: 129 commands = RAMRequestCommands 130 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 131 commands = LEDRequestCommands 132 case EmbedPeripherals.IO: 133 commands = IORequestCommands 134 case EmbedPeripherals.THERMOMETER: 135 commands = ThermometerRequestCommands 136 case EmbedPeripherals.UART: 137 commands = UartRequestCommands 138 case EmbedPeripherals.FRC: 139 commands = FrcRequestCommands 140 case EmbedPeripherals.EXPLORATION: 141 commands = ExplorationRequestCommands 142 case Standards.DALI: 143 commands = DALIRequestCommands 144 case Standards.BINARY_OUTPUT: 145 commands = BinaryOutputRequestCommands 146 case Standards.SENSOR: 147 commands = SensorRequestCommands 148 case Standards.LIGHT: 149 commands = LightRequestCommands 150 case _: 151 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 152 153 if commands is not None and pcmd in commands: 154 return commands(pcmd) 155 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.')
Return request command based on DPA peripheral and command data byte.
Arguments:
- pnum (Union[Peripheral, int]): Peripheral enum member
- pcmd (int): Command data byte value
Returns:
Commandorint: Request command enum value
Raises:
- InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value between 0 and 127
- UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any peripheral enum
- UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any peripheral command enum
157 @staticmethod 158 def response_pcmd_from_dpa(pnum: Union[Peripheral, int], pcmd: int) -> Union[Command, int]: 159 """Return response command based on DPA peripheral and command data byte. 160 161 Args: 162 pnum (Union[Peripheral, int]): Peripheral enum member 163 pcmd (int): Command data byte value 164 Returns: 165 :obj:`Command` or :obj:`int`: Response command enum member 166 Raises: 167 InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value 168 between 128 and 255 169 UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any 170 peripheral enum 171 UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any 172 peripheral command enum 173 """ 174 if pcmd < dpa_constants.REQUEST_PCMD_MIN: 175 raise InvalidPeripheralCommandValueError('Negative peripheral command values are not allowed.') 176 if pcmd <= dpa_constants.REQUEST_PCMD_MAX or pcmd > dpa_constants.RESPONSE_PCMD_MAX: 177 raise InvalidPeripheralCommandValueError('Response peripheral command should be value between 128 and 255.') 178 if dpa_constants.PNUM_USER_MIN <= pnum <= dpa_constants.PNUM_USER_MAX: 179 return pcmd 180 commands = None 181 match pnum: 182 case EmbedPeripherals.COORDINATOR: 183 commands = CoordinatorResponseCommands 184 case EmbedPeripherals.NODE: 185 commands = NodeResponseCommands 186 case EmbedPeripherals.OS: 187 commands = OSResponseCommands 188 case EmbedPeripherals.EEPROM: 189 commands = EEPROMResponseCommands 190 case EmbedPeripherals.EEEPROM: 191 commands = EEEPROMResponseCommands 192 case EmbedPeripherals.RAM: 193 commands = RAMResponseCommands 194 case EmbedPeripherals.LEDR | EmbedPeripherals.LEDG: 195 commands = LEDResponseCommands 196 case EmbedPeripherals.IO: 197 commands = IOResponseCommands 198 case EmbedPeripherals.THERMOMETER: 199 commands = ThermometerResponseCommands 200 case EmbedPeripherals.UART: 201 commands = UartResponseCommands 202 case EmbedPeripherals.FRC: 203 commands = FrcResponseCommands 204 case EmbedPeripherals.EXPLORATION: 205 commands = ExplorationResponseCommands 206 case Standards.DALI: 207 commands = DALIResponseCommands 208 case Standards.BINARY_OUTPUT: 209 commands = BinaryOutputResponseCommands 210 case Standards.SENSOR: 211 commands = SensorResponseCommands 212 case Standards.LIGHT: 213 commands = LightResponseCommands 214 case _: 215 raise UnsupportedPeripheralError('Unknown or unsupported peripheral.') 216 217 if commands is not None and pcmd in commands: 218 return commands(pcmd) 219 raise UnsupportedPeripheralCommandError('Unknown or unsupported peripheral command.')
Return response command based on DPA peripheral and command data byte.
Arguments:
- pnum (Union[Peripheral, int]): Peripheral enum member
- pcmd (int): Command data byte value
Returns:
Commandorint: Response command enum member
Raises:
- InvalidPeripheralCommandValueError: Raised if pcmd is a negative value, or if pcmd is not a value between 128 and 255
- UnsupportedPeripheralError: Raised if pnum parameter value is not recognized as a member of any peripheral enum
- UnsupportedPeripheralCommandError: Raised if pcmd parameter value is not recognized as a member of any peripheral command enum
221 @staticmethod 222 def pdata_from_dpa(dpa: bytes) -> Union[List[int], None]: 223 """Return PDATA from DPA response bytes. 224 225 Args: 226 dpa (bytes): DPA response message 227 Returns: 228 :obj:`list` of :obj:`int` or :obj:`None`: PDATA integer list or None of there are no PDATA 229 """ 230 if len(dpa) > 8: 231 return list(dpa[8:]) 232 return None
Return PDATA from DPA response bytes.
Arguments:
- dpa (bytes): DPA response message
Returns:
listofintorNone: PDATA integer list or None of there are no PDATA
234 @staticmethod 235 def parse_dpa_into_members(dpa: bytes): 236 """Parse DPA response into response members. 237 238 Args: 239 dpa (bytes): DPA response. 240 241 Returns: 242 :obj:`(int, int, int, int, int, int, Optional[List[int]])`: Response members 243 """ 244 nadr = dpa[dpa_constants.ResponsePacketMembers.NADR] 245 pnum = dpa[dpa_constants.ResponsePacketMembers.PNUM] 246 pcmd = dpa[dpa_constants.ResponsePacketMembers.PCMD] 247 hwpid = Common.hwpid_from_dpa( 248 dpa[dpa_constants.ResponsePacketMembers.HWPID_HI], 249 dpa[dpa_constants.ResponsePacketMembers.HWPID_LO] 250 ) 251 rcode = dpa[dpa_constants.ResponsePacketMembers.RCODE] 252 dpa_value = dpa[dpa_constants.ResponsePacketMembers.DPA_VALUE] 253 pdata = Common.pdata_from_dpa(dpa) 254 return nadr, pnum, pcmd, hwpid, rcode, dpa_value, pdata
Parse DPA response into response members.
Arguments:
- dpa (bytes): DPA response.
Returns:
(int, int, int, int, int, int, Optional[List[int]]): Response members
258 @staticmethod 259 def serialize_to_json(mtype: MessageType, msgid: str, nadr: int, hwpid: int = 0xFFFF, params: Optional[dict] = None, 260 dpa_rsp_time: Optional[float] = None): 261 """Serialize request parameters into JSON API request object. 262 263 Args: 264 mtype (MessageType): Message type. 265 msgid (str): Message ID. 266 nadr (int): Device address. 267 hwpid (int): Hardware profile ID. Defaults to 65535. 268 params (dict, optional): Request parameters. 269 dpa_rsp_time (float, optional): Request timeout. 270 271 Returns: 272 :obj:`dict`: JSON-serialized request 273 """ 274 params = params if params is not None else {} 275 json: dict = { 276 'mType': mtype.value, 277 'data': { 278 'msgId': msgid, 279 'req': { 280 'nAdr': nadr, 281 'hwpId': hwpid, 282 'param': params, 283 }, 284 'returnVerbose': True, 285 }, 286 } 287 if dpa_rsp_time is not None: 288 json['data']['timeout'] = math.ceil(dpa_rsp_time * 1000) 289 return json
Serialize request parameters into JSON API request object.
Arguments:
- mtype (MessageType): Message type.
- msgid (str): Message ID.
- nadr (int): Device address.
- hwpid (int): Hardware profile ID. Defaults to 65535.
- params (dict, optional): Request parameters.
- dpa_rsp_time (float, optional): Request timeout.
Returns:
dict: JSON-serialized request
291 @staticmethod 292 def msgid_from_json(json: dict) -> str: 293 """Return response msgid from Daemon API JSON response. 294 295 Args: 296 json (dict): JSON API response 297 Returns: 298 :obj:`str`: JSON API response message ID 299 Raises: 300 JsonMsgidMissingError: Raised if Daemon API response does not contain the msgId key 301 """ 302 try: 303 return json['data']['msgId'] 304 except KeyError as err: 305 raise JsonMsgidMissingError(f'Object does not contain property {str(err)}') from err
Return response msgid from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
str: JSON API response message ID
Raises:
- JsonMsgidMissingError: Raised if Daemon API response does not contain the msgId key
307 @staticmethod 308 def mtype_str_from_json(json: dict) -> str: 309 """Return message type from Daemon API JSON response. 310 311 Args: 312 json (dict): JSON API response 313 Returns: 314 :obj:`str`: JSON API response message type string 315 Raises: 316 JsonMTypeMissingError: Raised if Daemon API response does not contain the mType key 317 """ 318 try: 319 return json['mType'] 320 except KeyError as err: 321 raise JsonMTypeMissingError(f'Object does not contain property {str(err)}') from err
Return message type from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
str: JSON API response message type string
Raises:
- JsonMTypeMissingError: Raised if Daemon API response does not contain the mType key
323 @staticmethod 324 def nadr_from_json(json: dict) -> int: 325 """Return response nadr from Daemon API JSON response. 326 327 Args: 328 json (dict): JSON API response 329 Returns: 330 :obj:`int`: JSON API response device address 331 Raises: 332 JsonNadrMissingError: Raised if Daemon API response does not contain the nAdr key 333 """ 334 try: 335 return json['data']['rsp']['nAdr'] 336 except KeyError as err: 337 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err
Return response nadr from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response device address
Raises:
- JsonNadrMissingError: Raised if Daemon API response does not contain the nAdr key
339 @staticmethod 340 def pnum_from_json(json: dict) -> int: 341 """Return response pnum from Daemon API JSON response. 342 343 Args: 344 json (dict): JSON API response 345 Returns: 346 :obj:`int`: JSON API response peripheral number 347 Raises: 348 JsonNadrMissingError: Raised if Daemon API response does not contain the pnum key 349 """ 350 try: 351 return json['data']['rsp']['pnum'] 352 except KeyError as err: 353 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err
Return response pnum from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response peripheral number
Raises:
- JsonNadrMissingError: Raised if Daemon API response does not contain the pnum key
355 @staticmethod 356 def pcmd_from_json(json: dict) -> int: 357 """Return response pcmd from Daemon API JSON response. 358 359 Args: 360 json (dict): JSON API response 361 Returns: 362 :obj:`int`: JSON API response peripheral command 363 Raises: 364 JsonNadrMissingError: Raised if Daemon API response does not contain the pcmd key 365 """ 366 try: 367 return json['data']['rsp']['pcmd'] 368 except KeyError as err: 369 raise JsonNadrMissingError(f'Object does not contain property {str(err)}') from err
Return response pcmd from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response peripheral command
Raises:
- JsonNadrMissingError: Raised if Daemon API response does not contain the pcmd key
371 @staticmethod 372 def hwpid_from_json(json: dict) -> int: 373 """Return response hwpid from Daemon API JSON response. 374 375 Args: 376 json (dict): JSON API response 377 Returns: 378 :obj:`int`: JSON API response hardware profile ID 379 Raises: 380 JsonHwpidMissingError: Raised if Daemon API response does not contain the hwpId key 381 """ 382 try: 383 return json['data']['rsp']['hwpId'] 384 except KeyError as err: 385 raise JsonHwpidMissingError(f'Object does not contain property {str(err)}') from err
Return response hwpid from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response hardware profile ID
Raises:
- JsonHwpidMissingError: Raised if Daemon API response does not contain the hwpId key
387 @staticmethod 388 def rcode_from_json(json: dict) -> int: 389 """Return response rcode from Daemon API JSON response. 390 391 Args: 392 json (dict): JSON API response 393 Returns: 394 :obj:`int`: JSON API response DPA rcode 395 Raises: 396 JsonRCodeMissingError: Raised if Daemon API response does not contain the rcode key 397 """ 398 try: 399 return json['data']['rsp']['rCode'] 400 except KeyError as err: 401 raise JsonRCodeMissingError(f'Object does not contain property {str(err)}') from err
Return response rcode from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response DPA rcode
Raises:
- JsonRCodeMissingError: Raised if Daemon API response does not contain the rcode key
403 @staticmethod 404 def dpa_value_from_json(json: dict) -> int: 405 """Return response DPA value from Daemon API JSON response. 406 407 Args: 408 json (dict): JSON API response 409 Returns: 410 :obj:`int`: JSON API response dpa value 411 Raises: 412 JsonDpaValueMissingError: Raised if Daemon API response does not contain the dpaVal key 413 """ 414 try: 415 return json['data']['rsp']['dpaVal'] 416 except KeyError as err: 417 raise JsonDpaValueMissingError(f'Object does not contain property {str(err)}') from err
Return response DPA value from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response dpa value
Raises:
- JsonDpaValueMissingError: Raised if Daemon API response does not contain the dpaVal key
419 @staticmethod 420 def pdata_from_json(json: dict) -> Union[List[int], None]: 421 """Return pdata from Daemon API JSON response if available. 422 423 Args: 424 json (dict): JSON API response 425 Returns: 426 :obj:`Union[List[int], None]`: JSON API response pdata or None if there are no PDATA 427 """ 428 pdata = None 429 try: 430 raw = json['data']['raw'] 431 response = None 432 if isinstance(raw, list): 433 response = raw[0]['response'] 434 elif isinstance(raw, dict): 435 response = raw['response'] 436 response = response.split('.') 437 if len(response) > 8: 438 pdata = [int(x, 16) for x in response[8:]] 439 except KeyError: 440 pdata = None 441 finally: 442 return pdata
Return pdata from Daemon API JSON response if available.
Arguments:
- json (dict): JSON API response
Returns:
Union[List[int], None]: JSON API response pdata or None if there are no PDATA
444 @staticmethod 445 def result_from_json(json: dict) -> dict: 446 """Return response result from Daemon API JSON response. 447 448 Args: 449 json (dict): JSON API response 450 Returns: 451 :obj:`dict`: JSON API response result object 452 Raises: 453 JsonResultMissingError: Raised if JSON API response does not contain the result key 454 """ 455 try: 456 return json['data']['rsp']['result'] 457 except KeyError as err: 458 raise JsonResultMissingError(f'Object does not contain property {str(err)}') from err
Return response result from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
dict: JSON API response result object
Raises:
- JsonResultMissingError: Raised if JSON API response does not contain the result key
460 @staticmethod 461 def status_from_json(json: dict) -> int: 462 """Return response status from Daemon API JSON response. 463 464 Args: 465 json (dict): JSON API response 466 Returns: 467 :obj:`int`: JSON API response status code 468 Raises: 469 JsonStatusMissingError: Raised if JSON API response does not contain the status key 470 """ 471 try: 472 return json['data']['status'] 473 except KeyError as err: 474 raise JsonStatusMissingError(f'Object does not contain property {str(err)}') from err
Return response status from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response status code
Raises:
- JsonStatusMissingError: Raised if JSON API response does not contain the status key
476 @staticmethod 477 def generic_rdata_from_json(json: dict) -> str: 478 """Return generic response data from Daemon API JSON response. 479 480 Args: 481 json (dict): JSON API response 482 Returns: 483 :obj:`int`: JSON API response status code 484 Raises: 485 JsonStatusMissingError: Raised if JSON API response does not contain path to rData. 486 """ 487 try: 488 return json['data']['rsp']['rData'] 489 except KeyError as err: 490 raise JsonGenericDataMissingError(f'Object does not contain property {str(err)}') from err
Return generic response data from Daemon API JSON response.
Arguments:
- json (dict): JSON API response
Returns:
int: JSON API response status code
Raises:
- JsonStatusMissingError: Raised if JSON API response does not contain path to rData.
492 @staticmethod 493 def parse_json_into_members(json: dict, omit_result: bool = False): 494 """Parse JSON response into response members. 495 496 Args: 497 json (dict): JSON API response 498 omit_result (bool): Do not parse result 499 Returns: 500 :obj:`(str, int, int, int, int, List[int], dict)`: Response members 501 """ 502 msgid = Common.msgid_from_json(json=json) 503 nadr = Common.nadr_from_json(json=json) 504 hwpid = Common.hwpid_from_json(json=json) 505 rcode = Common.rcode_from_json(json=json) 506 dpa_value = Common.dpa_value_from_json(json=json) 507 pdata = Common.pdata_from_json(json=json) 508 if omit_result: 509 result = None 510 else: 511 result = Common.result_from_json(json=json) if rcode == dpa_constants.ResponseCodes.OK else None 512 return msgid, nadr, hwpid, rcode, dpa_value, pdata, result
Parse JSON response into response members.
Arguments:
- json (dict): JSON API response
- omit_result (bool): Do not parse result
Returns:
(str, int, int, int, int, List[int], dict): Response members
514 @staticmethod 515 def string_to_mtype(string: str) -> MessageType: 516 """Convert message type string to message type enum member value. 517 518 Args: 519 string (str): Message type string 520 Returns: 521 :obj:`MessageType`: Message type enum member 522 Raises: 523 UnsupportedMessageTypeError: Raised if message type is not recognized as a member of any message type enum 524 """ 525 messages = [GenericMessages, ExplorationMessages, CoordinatorMessages, NodeMessages, OSMessages, EEPROMMessages, 526 EEEPROMMessages, RAMMessages, LEDRMessages, LEDGMessages, IOMessages, ThermometerMessages, 527 UartMessages, FrcMessages, DALIMessages, BinaryOutputMessages, SensorMessages, LightMessages] 528 for item in messages: 529 if string in item: 530 return item(string) 531 raise UnsupportedMessageTypeError(f'Unknown or unsupported message type: {string}.')
Convert message type string to message type enum member value.
Arguments:
- string (str): Message type string
Returns:
MessageType: Message type enum member
Raises:
- UnsupportedMessageTypeError: Raised if message type is not recognized as a member of any message type enum
535 @staticmethod 536 def bitmap_to_nodes(bitmap: List[int], coordinator_shift: bool = False) -> List[int]: 537 """Convert node bitmap to list of nodes. 538 539 Args: 540 bitmap (List[int]): Node bitmap represented by list of integers 541 coordinator_shift (bool): Bitmap contains dummy coordinator value 542 Returns: 543 :obj:`list` of :obj:`int`: List of node addresses from bitmap 544 """ 545 nodes = [] 546 start = 0 if not coordinator_shift else 1 547 for i in range(start, len(bitmap * 8)): 548 if bitmap[int(i / 8)] & (1 << (i % 8)): 549 nodes.append(i) 550 return nodes
Convert node bitmap to list of nodes.
Arguments:
- bitmap (List[int]): Node bitmap represented by list of integers
- coordinator_shift (bool): Bitmap contains dummy coordinator value
Returns:
listofint: List of node addresses from bitmap
552 @staticmethod 553 def nodes_to_bitmap(nodes: List[int]) -> List[int]: 554 """Convert list of nodes to node bitmap. 555 556 Args: 557 nodes (List[int]): List of node addresses 558 Returns: 559 :obj:`list` of :obj:`int`: Nodes bitmap represented by list of 30 integers 560 """ 561 bitmap = [0] * 30 562 for node in nodes: 563 bitmap[math.floor(node / 8)] |= (1 << (node % 8)) 564 return bitmap
Convert list of nodes to node bitmap.
Arguments:
- nodes (List[int]): List of node addresses
Returns:
listofint: Nodes bitmap represented by list of 30 integers
566 @staticmethod 567 def bitmap_4byte_to_indexes(bitmap: List[int]) -> List[int]: 568 """Convert 4 byte bitmap to list of indexes. 569 570 Args: 571 bitmap (List[int]): Index bitmap. 572 573 Returns: 574 :obj:`List` of :obj:`int`: List of indexes. 575 """ 576 indexes = [0] * 32 577 for i in range(0, len(bitmap * 8)): 578 if bitmap[int(i / 8)] & (1 << (i % 8)): 579 indexes[i] = 1 580 return indexes
Convert 4 byte bitmap to list of indexes.
Arguments:
- bitmap (List[int]): Index bitmap.
Returns:
Listofint: List of indexes.
582 @staticmethod 583 def indexes_to_4byte_bitmap(values: List[int]) -> List[int]: 584 """Convert list of indexes to a 4 byte bitmap. 585 586 Args: 587 values (List[int]): List of indexes 588 Returns: 589 :obj:`list` of :obj:`int`: Index bitmap represented by list of 4 integers 590 """ 591 bitmap = [0] * 4 592 for value in values: 593 bitmap[math.floor(value / 8)] |= (1 << (value % 8)) 594 return bitmap
Convert list of indexes to a 4 byte bitmap.
Arguments:
- values (List[int]): List of indexes
Returns:
listofint: Index bitmap represented by list of 4 integers
596 @staticmethod 597 def is_hex_string(string: str) -> bool: 598 """Check if string contains only hexadecimal characters. 599 600 Args: 601 string (str): Input string 602 Returns: 603 :obj:`bool`: True if string contains only hexadecimal characters, False otherwise 604 """ 605 if len(string) == 0: 606 return False 607 return not set(string) - set('0123456789abcdefABCDEF')
Check if string contains only hexadecimal characters.
Arguments:
- string (str): Input string
Returns:
bool: True if string contains only hexadecimal characters, False otherwise
609 @staticmethod 610 def hex_string_to_list(string: str) -> List[int]: 611 """Convert hexadecimal string to list of unsigned integers. 612 613 Args: 614 string (str): Hexadecimal string 615 Returns: 616 :obj:`list` of :obj:`int`: List of integers from hexadecimal string 617 Raises: 618 ValueError: Raised if string is of uneven length or contains non-hexadecimal characters 619 """ 620 if not len(string) % 2 == 0: 621 raise ValueError('Argument should be even length.') 622 if not Common.is_hex_string(string): 623 raise ValueError('Argument is not a hexadecimal string.') 624 return [int(string[i:i + 2], base=16) for i in range(0, len(string), 2)]
Convert hexadecimal string to list of unsigned integers.
Arguments:
- string (str): Hexadecimal string
Returns:
listofint: List of integers from hexadecimal string
Raises:
- ValueError: Raised if string is of uneven length or contains non-hexadecimal characters
626 @staticmethod 627 def list_to_hex_string(values: List[int], separator: str = ' ', uppercase: bool = True) -> str: 628 """Convert list of unsigned integers to hexadecimal string. 629 630 Args: 631 values (List[int]): List of unsigned integers 632 separator (str, optional): Separator for byte string. Defaults to empty string. 633 uppercase (bool, optional): Output hex string uppercase. Defaults to False. 634 635 Returns: 636 :obj:`string`: Hexadecimal string representation of the list 637 """ 638 hex_format = 'X' if uppercase else 'x' 639 return separator.join(f'{value:02{hex_format}}' for value in values)
Convert list of unsigned integers to hexadecimal string.
Arguments:
- values (List[int]): List of unsigned integers
- separator (str, optional): Separator for byte string. Defaults to empty string.
- uppercase (bool, optional): Output hex string uppercase. Defaults to False.
Returns:
string: Hexadecimal string representation of the list
641 @staticmethod 642 def peripheral_list_to_bitmap(values: List[int]) -> List[int]: 643 """Convert list of peripheral numbers into a bitmap of peripherals. 644 645 For example, the coordinator peripheral would be represented by the 1st bit in bitmap. 646 647 Args: 648 values (List[int]): List of peripheral numbers 649 Returns: 650 :obj:`list` of :obj:`int`: Peripheral bitmap represented by list of 4 integers 651 """ 652 bitmap = [0 for _ in range(32)] 653 for value in values: 654 bitmap[value] = 1 655 byte_list = [] 656 for bits in [bitmap[i:i + 8] for i in range(0, len(bitmap), 8)]: 657 bits.reverse() 658 byte = 0 659 for bit in bits: 660 byte = (byte << 1) | bit 661 byte_list.append(byte) 662 return byte_list
Convert list of peripheral numbers into a bitmap of peripherals.
For example, the coordinator peripheral would be represented by the 1st bit in bitmap.
Arguments:
- values (List[int]): List of peripheral numbers
Returns:
listofint: Peripheral bitmap represented by list of 4 integers
664 @staticmethod 665 def values_in_byte_range(values: List[int]) -> bool: 666 """Check if list elements are within unsigned integer byte range. 667 668 Args: 669 values (List[int]): Input data 670 Returns: 671 :obj:`bool`: True if values are in range, False otherwise 672 """ 673 return len([value for value in values if value < 0 or value > 255]) == 0
Check if list elements are within unsigned integer byte range.
Arguments:
- values (List[int]): Input data
Returns:
bool: True if values are in range, False otherwise
675 @staticmethod 676 def byte_complement(value: int): 677 """Convert unsigned 1B value into a signed 1B value. 678 679 Args: 680 value (int): Input unsigned 1B value 681 Returns: 682 :obj:`int`: Signed 1B value 683 Raises: 684 ValueError: Raised when value is not in unsigned 8bit range. 685 """ 686 if not 0 <= value <= 0xFF: 687 raise ValueError('Not an unsigned 1B value.') 688 if value < 0x80: 689 return value 690 return value - 0x100
Convert unsigned 1B value into a signed 1B value.
Arguments:
- value (int): Input unsigned 1B value
Returns:
int: Signed 1B value
Raises:
- ValueError: Raised when value is not in unsigned 8bit range.
692 @staticmethod 693 def word_complement(value: int) -> int: 694 """Convert unsigned 2B value into a signed 2B value. 695 696 Args: 697 value (int): Input unsigned 2B value 698 Returns: 699 :obj:`int`: Signed 2B value 700 Raises: 701 ValueError: Raised when value is not in unsigned 16bit range. 702 """ 703 if not 0 <= value <= 0xFFFF: 704 raise ValueError('Not an unsigned 2B value.') 705 if value < 0x8000: 706 return value 707 return value - 0x10000
Convert unsigned 2B value into a signed 2B value.
Arguments:
- value (int): Input unsigned 2B value
Returns:
int: Signed 2B value
Raises:
- ValueError: Raised when value is not in unsigned 16bit range.
709 @staticmethod 710 def bcd_to_decimal(byte: int) -> int: 711 """Convert a Binary-Coded Decimal (BCD) value to decimal integer. 712 713 Args: 714 byte (int): BCD value to convert. 715 716 Returns: 717 :obj:`int`: Decimal value encoded in BCD. 718 719 Raises: 720 ValueError: Raised when byte argument is not a valid BCD value. 721 722 Examples: 723 >>> Common.bcd_to_decimal(0x42) 724 42 725 """ 726 high = (byte >> 4) & 0x0F 727 low = byte & 0x0F 728 if high > 9 or low > 9: 729 raise ValueError('Not a BCD value.') 730 return high * 10 + low
Convert a Binary-Coded Decimal (BCD) value to decimal integer.
Arguments:
- byte (int): BCD value to convert.
Returns:
int: Decimal value encoded in BCD.
Raises:
- ValueError: Raised when byte argument is not a valid BCD value.
Examples:
>>> Common.bcd_to_decimal(0x42) 42
732 @staticmethod 733 def dpa_build_date_to_str(par1: int, par2: int) -> str: 734 """Convert DPA build date in BCD format to human-readable string. 735 736 Outputs date string in the following format: DD.MM.YYYY. 737 738 Args: 739 par1 (int): BCD-encoded day of DPA build date. 740 par2 (int): BCD-encoded month and year of DPA build date. 741 742 Returns: 743 str: Human-readable DPA build date string. 744 """ 745 try: 746 day = Common.bcd_to_decimal(par1) 747 month = par2 & 0x0F 748 year = 2010 + ((par2 >> 4) & 0x0F) 749 return f'{day}.{month}.{year}' 750 except ValueError as e: 751 raise ValueError(f'Failed to convert DPA build date to string: {str(e)}') from e
Convert DPA build date in BCD format to human-readable string.
Outputs date string in the following format: DD.MM.YYYY.
Arguments:
- par1 (int): BCD-encoded day of DPA build date.
- par2 (int): BCD-encoded month and year of DPA build date.
Returns:
str: Human-readable DPA build date string.
753 @staticmethod 754 def dpa_version_to_str(dpa_version: int) -> str: 755 """Convert DPA version in BCD format to human-readable version string. 756 757 Args: 758 dpa_version (int): DPA version encoded in BCD format. 759 760 Returns: 761 str: Human-readable DPA version string. 762 763 Raises: 764 ValueError: Raised when dpa_version is not a valid BCD value. 765 """ 766 try: 767 major = Common.bcd_to_decimal(int(dpa_version / 256)) 768 minor = dpa_version & 0x00FF 769 return f'{major}.{minor:02X}' 770 except ValueError as e: 771 raise ValueError(f'Failed to convert version to string: {str(e)}') from e
Convert DPA version in BCD format to human-readable version string.
Arguments:
- dpa_version (int): DPA version encoded in BCD format.
Returns:
str: Human-readable DPA version string.
Raises:
- ValueError: Raised when dpa_version is not a valid BCD value.
773 @staticmethod 774 def fletcher_checksum(init: int, data: List[int]) -> int: 775 """Calculate one's complement Fletcher checksum from list of bytes. 776 777 See https://doc.iqrf.org/DpaTechGuide/pages/FletcherCSharp.html 778 779 Args: 780 init (int): Initial value. 781 data (List[int]): List of bytes to calculate checksum from. 782 783 Returns: 784 :obj:`int`: 16bit Fletcher checksum integer value. 785 """ 786 checksum = init 787 for byte in data: 788 low = checksum & 0xFF 789 low += byte 790 if (low & 0x100) != 0: 791 low += 1 792 high = checksum >> 8 793 high += low & 0xFF 794 if (high & 0x100) != 0: 795 high += 1 796 checksum = (low & 0xFF) | (high & 0xFF) << 8 797 return checksum
Calculate one's complement Fletcher checksum from list of bytes.
See https://doc.iqrf.org/DpaTechGuide/pages/FletcherCSharp.html
Arguments:
- init (int): Initial value.
- data (List[int]): List of bytes to calculate checksum from.
Returns:
int: 16bit Fletcher checksum integer value.