iqrfpy.async_response
Async response module.
This module contains Asynchronous DPA response class.
1"""Async response module. 2 3This module contains Asynchronous DPA response class. 4""" 5 6from typing import List, Optional, Union 7from iqrfpy.enums.peripherals import Peripheral 8from iqrfpy.enums.commands import Command 9from iqrfpy.enums.message_types import GenericMessages 10from iqrfpy.iresponse import IResponseGetterMixin 11from iqrfpy.utils.common import Common 12import iqrfpy.utils.dpa as dpa_constants 13from iqrfpy.utils.dpa import ResponsePacketMembers, ResponseCodes 14from iqrfpy.utils.validators import DpaValidator 15 16__all__ = ['AsyncResponse'] 17 18 19class AsyncResponse(IResponseGetterMixin): 20 """Asynchronous response class. 21 22 This class is used to capture and resolve asynchronous responses, 23 as this is a generic response class, the data is only sent as string of bytes in hexadecimal 24 representation separated by dots. 25 """ 26 27 def __init__(self, nadr: int, pnum: Peripheral, pcmd: Command, hwpid: int = dpa_constants.HWPID_MAX, 28 rcode: int = 0x80, dpa_value: int = 0, pdata: Union[List[int], None] = None, 29 msgid: Optional[str] = None, result: Optional[dict] = None): 30 """AsyncResponse constructor. 31 32 Args: 33 nadr (int): Device address. 34 pnum (Peripheral): Peripheral. 35 pcmd (Command): Peripheral command. 36 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 37 rcode (int, optional): Response code. Defaults to 128. 38 dpa_value (int, optional): DPA value. Defaults to 0. 39 pdata (List[int], optional): DPA response data. Defaults to None. 40 msgid (str, optional): Message ID. Defaults to None. 41 result (dict, optional): JSON response data. Defaults to None. 42 """ 43 super().__init__( 44 nadr=nadr, 45 pnum=pnum, 46 pcmd=pcmd, 47 m_type=GenericMessages.RAW, 48 hwpid=hwpid, 49 rcode=rcode, 50 dpa_value=dpa_value, 51 pdata=pdata, 52 msgid=msgid, 53 result=result 54 ) 55 56 @classmethod 57 def from_dpa(cls, dpa: bytes) -> 'AsyncResponse': 58 """Asynchronous response DPA factory method. 59 60 Parses DPA data and constructs :obj:`AsyncResponse` object. 61 62 Args: 63 dpa (bytes): DPA response bytes. 64 65 Returns: 66 :obj:`AsyncResponse`: Asynchronous response message object. 67 """ 68 DpaValidator.base_response_length(dpa=dpa) 69 nadr = dpa[ResponsePacketMembers.NADR] 70 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 71 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 72 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 73 rcode = dpa[ResponsePacketMembers.RCODE] 74 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 75 result = None 76 if rcode == ResponseCodes.ASYNC_RESPONSE: 77 if len(dpa) > 8: 78 result = {'rData': list(dpa)} 79 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 80 pdata=list(dpa), result=result) 81 82 @classmethod 83 def from_json(cls, json: dict) -> 'AsyncResponse': 84 """Asynchronous response JSON factory method. 85 86 Parses JSON API response and constructs :obj:`AsyncResponse` object. 87 88 Args: 89 json (dict): JSON API response. 90 91 Returns: 92 :obj:`AsyncResponse`: Asynchronous response message object. 93 """ 94 msgid = Common.msgid_from_json(json) 95 result = json['data']['rsp'] 96 packet = result['rData'].replace('.', '') 97 pdata = bytes.fromhex(packet) 98 ldata = Common.hex_string_to_list(packet) 99 nadr = ldata[ResponsePacketMembers.NADR] 100 hwpid = Common.hwpid_from_dpa(ldata[ResponsePacketMembers.HWPID_HI], ldata[ResponsePacketMembers.HWPID_LO]) 101 pnum = Common.pnum_from_dpa(ldata[ResponsePacketMembers.PNUM]) 102 pcmd = Common.request_pcmd_from_dpa(pnum, ldata[ResponsePacketMembers.PCMD]) 103 rcode = ldata[ResponsePacketMembers.RCODE] 104 dpa_value = ldata[ResponsePacketMembers.DPA_VALUE] 105 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 106 pdata=list(pdata), msgid=msgid, result=result)
20class AsyncResponse(IResponseGetterMixin): 21 """Asynchronous response class. 22 23 This class is used to capture and resolve asynchronous responses, 24 as this is a generic response class, the data is only sent as string of bytes in hexadecimal 25 representation separated by dots. 26 """ 27 28 def __init__(self, nadr: int, pnum: Peripheral, pcmd: Command, hwpid: int = dpa_constants.HWPID_MAX, 29 rcode: int = 0x80, dpa_value: int = 0, pdata: Union[List[int], None] = None, 30 msgid: Optional[str] = None, result: Optional[dict] = None): 31 """AsyncResponse constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Peripheral): Peripheral. 36 pcmd (Command): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 msgid (str, optional): Message ID. Defaults to None. 42 result (dict, optional): JSON response data. Defaults to None. 43 """ 44 super().__init__( 45 nadr=nadr, 46 pnum=pnum, 47 pcmd=pcmd, 48 m_type=GenericMessages.RAW, 49 hwpid=hwpid, 50 rcode=rcode, 51 dpa_value=dpa_value, 52 pdata=pdata, 53 msgid=msgid, 54 result=result 55 ) 56 57 @classmethod 58 def from_dpa(cls, dpa: bytes) -> 'AsyncResponse': 59 """Asynchronous response DPA factory method. 60 61 Parses DPA data and constructs :obj:`AsyncResponse` object. 62 63 Args: 64 dpa (bytes): DPA response bytes. 65 66 Returns: 67 :obj:`AsyncResponse`: Asynchronous response message object. 68 """ 69 DpaValidator.base_response_length(dpa=dpa) 70 nadr = dpa[ResponsePacketMembers.NADR] 71 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 72 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 73 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 74 rcode = dpa[ResponsePacketMembers.RCODE] 75 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 76 result = None 77 if rcode == ResponseCodes.ASYNC_RESPONSE: 78 if len(dpa) > 8: 79 result = {'rData': list(dpa)} 80 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 81 pdata=list(dpa), result=result) 82 83 @classmethod 84 def from_json(cls, json: dict) -> 'AsyncResponse': 85 """Asynchronous response JSON factory method. 86 87 Parses JSON API response and constructs :obj:`AsyncResponse` object. 88 89 Args: 90 json (dict): JSON API response. 91 92 Returns: 93 :obj:`AsyncResponse`: Asynchronous response message object. 94 """ 95 msgid = Common.msgid_from_json(json) 96 result = json['data']['rsp'] 97 packet = result['rData'].replace('.', '') 98 pdata = bytes.fromhex(packet) 99 ldata = Common.hex_string_to_list(packet) 100 nadr = ldata[ResponsePacketMembers.NADR] 101 hwpid = Common.hwpid_from_dpa(ldata[ResponsePacketMembers.HWPID_HI], ldata[ResponsePacketMembers.HWPID_LO]) 102 pnum = Common.pnum_from_dpa(ldata[ResponsePacketMembers.PNUM]) 103 pcmd = Common.request_pcmd_from_dpa(pnum, ldata[ResponsePacketMembers.PCMD]) 104 rcode = ldata[ResponsePacketMembers.RCODE] 105 dpa_value = ldata[ResponsePacketMembers.DPA_VALUE] 106 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 107 pdata=list(pdata), msgid=msgid, result=result)
Asynchronous response class.
This class is used to capture and resolve asynchronous responses, as this is a generic response class, the data is only sent as string of bytes in hexadecimal representation separated by dots.
AsyncResponse( nadr: int, pnum: iqrfpy.enums.peripherals.Peripheral, pcmd: iqrfpy.enums.commands.Command, hwpid: int = 65535, rcode: int = 128, dpa_value: int = 0, pdata: Optional[List[int]] = None, msgid: Optional[str] = None, result: Optional[dict] = None)
28 def __init__(self, nadr: int, pnum: Peripheral, pcmd: Command, hwpid: int = dpa_constants.HWPID_MAX, 29 rcode: int = 0x80, dpa_value: int = 0, pdata: Union[List[int], None] = None, 30 msgid: Optional[str] = None, result: Optional[dict] = None): 31 """AsyncResponse constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Peripheral): Peripheral. 36 pcmd (Command): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 msgid (str, optional): Message ID. Defaults to None. 42 result (dict, optional): JSON response data. Defaults to None. 43 """ 44 super().__init__( 45 nadr=nadr, 46 pnum=pnum, 47 pcmd=pcmd, 48 m_type=GenericMessages.RAW, 49 hwpid=hwpid, 50 rcode=rcode, 51 dpa_value=dpa_value, 52 pdata=pdata, 53 msgid=msgid, 54 result=result 55 )
AsyncResponse constructor.
Arguments:
- nadr (int): Device address.
- pnum (Peripheral): Peripheral.
- pcmd (Command): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check.
- rcode (int, optional): Response code. Defaults to 128.
- dpa_value (int, optional): DPA value. Defaults to 0.
- pdata (List[int], optional): DPA response data. Defaults to None.
- msgid (str, optional): Message ID. Defaults to None.
- result (dict, optional): JSON response data. Defaults to None.
57 @classmethod 58 def from_dpa(cls, dpa: bytes) -> 'AsyncResponse': 59 """Asynchronous response DPA factory method. 60 61 Parses DPA data and constructs :obj:`AsyncResponse` object. 62 63 Args: 64 dpa (bytes): DPA response bytes. 65 66 Returns: 67 :obj:`AsyncResponse`: Asynchronous response message object. 68 """ 69 DpaValidator.base_response_length(dpa=dpa) 70 nadr = dpa[ResponsePacketMembers.NADR] 71 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 72 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 73 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 74 rcode = dpa[ResponsePacketMembers.RCODE] 75 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 76 result = None 77 if rcode == ResponseCodes.ASYNC_RESPONSE: 78 if len(dpa) > 8: 79 result = {'rData': list(dpa)} 80 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 81 pdata=list(dpa), result=result)
Asynchronous response DPA factory method.
Parses DPA data and constructs AsyncResponse
object.
Arguments:
- dpa (bytes): DPA response bytes.
Returns:
AsyncResponse
: Asynchronous response message object.
83 @classmethod 84 def from_json(cls, json: dict) -> 'AsyncResponse': 85 """Asynchronous response JSON factory method. 86 87 Parses JSON API response and constructs :obj:`AsyncResponse` object. 88 89 Args: 90 json (dict): JSON API response. 91 92 Returns: 93 :obj:`AsyncResponse`: Asynchronous response message object. 94 """ 95 msgid = Common.msgid_from_json(json) 96 result = json['data']['rsp'] 97 packet = result['rData'].replace('.', '') 98 pdata = bytes.fromhex(packet) 99 ldata = Common.hex_string_to_list(packet) 100 nadr = ldata[ResponsePacketMembers.NADR] 101 hwpid = Common.hwpid_from_dpa(ldata[ResponsePacketMembers.HWPID_HI], ldata[ResponsePacketMembers.HWPID_LO]) 102 pnum = Common.pnum_from_dpa(ldata[ResponsePacketMembers.PNUM]) 103 pcmd = Common.request_pcmd_from_dpa(pnum, ldata[ResponsePacketMembers.PCMD]) 104 rcode = ldata[ResponsePacketMembers.RCODE] 105 dpa_value = ldata[ResponsePacketMembers.DPA_VALUE] 106 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 107 pdata=list(pdata), msgid=msgid, result=result)
Asynchronous response JSON factory method.
Parses JSON API response and constructs AsyncResponse
object.
Arguments:
- json (dict): JSON API response.
Returns:
AsyncResponse
: Asynchronous response message object.