iqrfpy.messages
Messages module.
This module serves as a single import for all messages.
1"""Messages module. 2 3This module serves as a single import for all messages. 4""" 5from .irequest import IRequest 6from .iresponse import IResponse 7from .async_response import AsyncResponse 8from .confirmation import Confirmation 9from .peripherals.generic.requests import GenericRequest 10from .peripherals.generic.responses import GenericResponse 11 12from .peripherals.coordinator.requests import ( 13 AddrInfoRequest as CoordinatorAddrInfoReq, 14 AuthorizeBondRequest as CoordinatorAuthorizeBondReq, 15 BackupRequest as CoordinatorBackupReq, 16 BondNodeRequest as CoordinatorBondNodeReq, 17 BondedDevicesRequest as CoordinatorBondedDevicesReq, 18 ClearAllBondsRequest as CoordinatorClearAllBondsReq, 19 DiscoveredDevicesRequest as CoordinatorDiscoveredDevicesReq, 20 DiscoveryRequest as CoordinatorDiscoveryReq, 21 RemoveBondRequest as CoordinatorRemoveBondReq, 22 RestoreRequest as CoordinatorRestoreReq, 23 SetDpaParamsRequest as CoordinatorSetDpaParamsReq, 24 SetHopsRequest as CoordinatorSetHopsReq, 25 SetMidRequest as CoordinatorSetMidReq, 26 SmartConnectRequest as CoordinatorSmartConnectReq, 27) 28 29from iqrfpy.peripherals.coordinator.responses import ( 30 AddrInfoResponse as CoordinatorAddrInfoRsp, 31 AuthorizeBondResponse as CoordinatorAuthorizeBondRsp, 32 BackupResponse as CoordinatorBackupRsp, 33 BondNodeResponse as CoordinatorBondNodeRsp, 34 BondedDevicesResponse as CoordinatorBondedDevicesRsp, 35 ClearAllBondsResponse as CoordinatorClearAllBondsRsp, 36 DiscoveredDevicesResponse as CoordinatorDiscoveredDevicesRsp, 37 DiscoveryResponse as CoordinatorDiscoveryRsp, 38 RemoveBondResponse as CoordinatorRemoveBondRsp, 39 RestoreResponse as CoordinatorRestoreRsp, 40 SetDpaParamsResponse as CoordinatorSetDpaParamsRsp, 41 SetHopsResponse as CoordinatorSetHopsRsp, 42 SetMidResponse as CoordinatorSetMidRsp, 43 SmartConnectResponse as CoordinatorSmartConnectRsp, 44) 45 46from .peripherals.eeeprom.requests import ( 47 ReadRequest as EeepromReadReq, 48 WriteRequest as EeepromWriteReq, 49) 50 51from .peripherals.eeeprom.responses import ( 52 ReadResponse as EeepromReadRsp, 53 WriteResponse as EeepromWriteRsp, 54) 55 56from .peripherals.eeprom.requests import ( 57 ReadRequest as EepromReadReq, 58 WriteRequest as EepromWriteReq, 59) 60 61from .peripherals.eeprom.responses import ( 62 ReadResponse as EepromReadRsp, 63 WriteResponse as EepromWriteRsp, 64) 65 66from .peripherals.exploration.requests import ( 67 PeripheralEnumerationRequest as ExplorationPeripheralEnumerationReq, 68 PeripheralInformationRequest as ExplorationPeripheralInformationReq, 69 MorePeripheralsInformationRequest as ExplorationMorePeripheralsInformationReq, 70) 71 72from .peripherals.exploration.responses import ( 73 PeripheralEnumerationResponse as ExplorationPeripheralEnumerationRsp, 74 PeripheralInformationResponse as ExplorationPeripheralInformationRsp, 75 MorePeripheralsInformationResponse as ExplorationMorePeripheralsInformationRsp, 76) 77 78from .peripherals.frc.requests import ( 79 SendRequest as FrcSendReq, 80 ExtraResultRequest as FrcExtraResultReq, 81 SendSelectiveRequest as FrcSendSelectiveReq, 82 SetFrcParamsRequest as FrcSetFrcParamsReq, 83) 84 85from .peripherals.frc.responses import ( 86 SendResponse as FrcSendRsp, 87 ExtraResultResponse as FrcExtraResultRsp, 88 SendSelectiveResponse as FrcSendSelectiveRsp, 89 SetFrcParamsResponse as FrcSetFrcParamsRsp, 90) 91 92from .peripherals.io.requests import ( 93 DirectionRequest as IoDirectionReq, 94 GetRequest as IoGetReq, 95 SetRequest as IoSetReq, 96) 97 98from .peripherals.io.responses import ( 99 DirectionResponse as IoDirectionRsp, 100 GetResponse as IoGetRsp, 101 SetResponse as IoSetRsp, 102) 103 104from .peripherals.ledg.requests import ( 105 SetOnRequest as LedgSetOnReq, 106 SetOffRequest as LedgSetOffReq, 107 PulseRequest as LedgPulseReq, 108 FlashingRequest as LedgFlashingReq, 109) 110 111from .peripherals.ledg.responses import ( 112 SetOnResponse as LedgSetOnRsp, 113 SetOffResponse as LedgSetOffRsp, 114 PulseResponse as LedgPulseRsp, 115 FlashingResponse as LedgFlashingRsp, 116) 117 118from .peripherals.ledr.requests import ( 119 SetOnRequest as LedrSetOnReq, 120 SetOffRequest as LedrSetOffReq, 121 PulseRequest as LedrPulseReq, 122 FlashingRequest as LedrFlashingReq, 123) 124 125from .peripherals.ledr.responses import ( 126 SetOnResponse as LedrSetOnRsp, 127 SetOffResponse as LedrSetOffRsp, 128 PulseResponse as LedrPulseRsp, 129 FlashingResponse as LedrFlashingRsp, 130) 131 132from .peripherals.node.requests import ( 133 ReadRequest as NodeReadReq, 134 RemoveBondRequest as NodeRemoveBondReq, 135 BackupRequest as NodeBackupReq, 136 RestoreRequest as NodeRestoreReq, 137 ValidateBondsRequest as NodeValidateBondsReq, 138) 139 140from .peripherals.node.responses import ( 141 ReadResponse as NodeReadRsp, 142 RemoveBondResponse as NodeRemoveBondRsp, 143 BackupResponse as NodeBackupRsp, 144 RestoreResponse as NodeRestoreRsp, 145 ValidateBondsResponse as NodeValidateBondsRsp, 146) 147 148from .peripherals.os.requests import ( 149 ReadRequest as OsReadReq, 150 ResetRequest as OsResetReq, 151 RestartRequest as OsRestartReq, 152 ReadTrConfRequest as OsReadTrConfReq, 153 WriteTrConfRequest as OsWriteTrConfReq, 154 WriteTrConfByteRequest as OsWriteTrConfByteReq, 155 RfpgmRequest as OsRfpgmReq, 156 SleepRequest as OsSleepReq, 157 SetSecurityRequest as OsSetSecurityReq, 158 BatchRequest as OsBatchReq, 159 SelectiveBatchRequest as OsSelectiveBatchReq, 160 IndicateRequest as OsIndicateReq, 161 FactorySettingsRequest as OsFactorySettingsReq, 162 TestRfSignalRequest as OsTestRfSignalReq, 163 LoadCodeRequest as OsLoadCodeReq, 164) 165 166from .peripherals.os.responses import ( 167 ReadResponse as OsReadRsp, 168 ResetResponse as OsResetRsp, 169 RestartResponse as OsRestartRsp, 170 ReadTrConfResponse as OsReadTrConfRsp, 171 WriteTrConfResponse as OsWriteTrConfRsp, 172 WriteTrConfByteResponse as OsWriteTrConfByteRsp, 173 RfpgmResponse as OsRfpgmRsp, 174 SleepResponse as OsSleepRsp, 175 SetSecurityResponse as OsSetSecurityRsp, 176 BatchResponse as OsBatchRsp, 177 SelectiveBatchResponse as OsSelectiveBatchRsp, 178 IndicateResponse as OsIndicateRsp, 179 FactorySettingsResponse as OsFactorySettingsRsp, 180 TestRfSignalResponse as OsTestRfSignalRsp, 181 LoadCodeResponse as OsLoadCodeRsp, 182) 183 184from .peripherals.ram.requests import ( 185 ReadRequest as RamReadReq, 186 WriteRequest as RamWriteReq, 187 ReadAnyRequest as RamReadAnyReq, 188) 189 190from .peripherals.ram.responses import ( 191 ReadResponse as RamReadRsp, 192 WriteResponse as RamWriteRsp, 193 ReadAnyResponse as RamReadAnyRsp, 194) 195 196from .peripherals.thermometer.requests.read import ReadRequest as ThermometerReadReq 197from .peripherals.thermometer.responses.read import ReadResponse as ThermometerReadRsp 198 199from .peripherals.uart.requests import ( 200 OpenRequest as UartOpenReq, 201 CloseRequest as UartCloseReq, 202 WriteReadRequest as UartWriteReadReq, 203 ClearWriteReadRequest as UartClearWriteReadReq, 204) 205 206from .peripherals.uart.responses import ( 207 OpenResponse as UartOpenRsp, 208 CloseResponse as UartCloseRsp, 209 WriteReadResponse as UartWriteReadRsp, 210 ClearWriteReadResponse as UartClearWriteReadRsp, 211) 212 213from .peripherals.binaryoutput.requests import ( 214 EnumerateRequest as BinaryOutputEnumerateReq, 215 SetOutputRequest as BinaryOutputSetOutputReq, 216) 217 218from .peripherals.binaryoutput.responses import ( 219 EnumerateResponse as BinaryOutputEnumerateRsp, 220 SetOutputResponse as BinaryOutputSetOutputRsp, 221) 222 223from .peripherals.sensor.requests import ( 224 EnumerateRequest as SensorEnumerateReq, 225 ReadSensorsRequest as SensorReadSensorsReq, 226 ReadSensorsWithTypesRequest as SensorReadWithTypesReq, 227) 228 229from .peripherals.sensor.responses import ( 230 EnumerateResponse as SensorEnumerateRsp, 231 ReadSensorsResponse as SensorReadSensorsRsp, 232 ReadSensorsWithTypesResponse as SensorReadWithTypesRsp, 233) 234 235from .objects import ( 236 BinaryOutputState, 237 CoordinatorAuthorizeBondParams, 238 CoordinatorDpaParam, 239 ExplorationPerEnumData, 240 ExplorationPerInfoData, 241 IoTriplet, 242 NodeReadData, 243 NodeValidateBondsParams, 244 OsBatchData, 245 OsIndicateParam, 246 OsLoadCodeFlags, 247 OsReadData, 248 OsSecurityTypeParam, 249 OsSleepParams, 250 OsTrConfByte, 251 OsTrConfData, 252 SensorWrittenData, 253) 254 255from .utils.sensor_parser import SensorData 256 257 258__all__ = ( 259 # .irequest 260 'IRequest', 261 # .iresponse 262 'IResponse', 263 # .async_response 264 'AsyncResponse', 265 # .confirmation 266 'Confirmation', 267 # .peripherals.generic 268 'GenericRequest', 269 'GenericResponse', 270 # .peripherals.coordinator 271 'CoordinatorAddrInfoReq', 272 'CoordinatorAddrInfoRsp', 273 'CoordinatorAuthorizeBondReq', 274 'CoordinatorAuthorizeBondRsp', 275 'CoordinatorBackupReq', 276 'CoordinatorBackupRsp', 277 'CoordinatorBondNodeReq', 278 'CoordinatorBondNodeRsp', 279 'CoordinatorBondedDevicesReq', 280 'CoordinatorBondedDevicesRsp', 281 'CoordinatorClearAllBondsReq', 282 'CoordinatorClearAllBondsRsp', 283 'CoordinatorDiscoveredDevicesReq', 284 'CoordinatorDiscoveredDevicesRsp', 285 'CoordinatorDiscoveryReq', 286 'CoordinatorDiscoveryRsp', 287 'CoordinatorRemoveBondReq', 288 'CoordinatorRemoveBondRsp', 289 'CoordinatorRestoreReq', 290 'CoordinatorRestoreRsp', 291 'CoordinatorSetDpaParamsReq', 292 'CoordinatorSetDpaParamsRsp', 293 'CoordinatorSetHopsReq', 294 'CoordinatorSetHopsRsp', 295 'CoordinatorSetMidReq', 296 'CoordinatorSetMidRsp', 297 'CoordinatorSmartConnectReq', 298 'CoordinatorSmartConnectRsp', 299 # .peripherals.eeeprom 300 'EeepromReadReq', 301 'EeepromReadRsp', 302 'EeepromWriteReq', 303 'EeepromWriteRsp', 304 # .peripherals.eeprom 305 'EepromReadReq', 306 'EepromReadRsp', 307 'EepromWriteReq', 308 'EepromWriteRsp', 309 # .peripherals.exploration 310 'ExplorationPeripheralEnumerationReq', 311 'ExplorationPeripheralEnumerationRsp', 312 'ExplorationPeripheralInformationReq', 313 'ExplorationPeripheralInformationRsp', 314 'ExplorationMorePeripheralsInformationReq', 315 'ExplorationMorePeripheralsInformationRsp', 316 # .peripherals.frc 317 'FrcSendReq', 318 'FrcSendRsp', 319 'FrcExtraResultReq', 320 'FrcExtraResultRsp', 321 'FrcSendSelectiveReq', 322 'FrcSendSelectiveRsp', 323 'FrcSetFrcParamsReq', 324 'FrcSetFrcParamsRsp', 325 # .peripherals.io 326 'IoDirectionReq', 327 'IoDirectionRsp', 328 'IoGetReq', 329 'IoGetRsp', 330 'IoSetReq', 331 'IoSetRsp', 332 # .peripherals.ledg 333 'LedgSetOnReq', 334 'LedgSetOnRsp', 335 'LedgSetOffReq', 336 'LedgSetOffRsp', 337 'LedgPulseReq', 338 'LedgPulseRsp', 339 'LedgFlashingReq', 340 'LedgFlashingRsp', 341 # .peripherals.ledr 342 'LedrSetOnReq', 343 'LedrSetOnRsp', 344 'LedrSetOffReq', 345 'LedrSetOffRsp', 346 'LedrPulseReq', 347 'LedrPulseRsp', 348 'LedrFlashingReq', 349 'LedrFlashingRsp', 350 # .peripherals.node 351 'NodeReadReq', 352 'NodeReadRsp', 353 'NodeRemoveBondReq', 354 'NodeRemoveBondRsp', 355 'NodeBackupReq', 356 'NodeBackupRsp', 357 'NodeRestoreReq', 358 'NodeRestoreRsp', 359 'NodeValidateBondsReq', 360 'NodeValidateBondsRsp', 361 # .peripherals.os 362 'OsReadReq', 363 'OsReadRsp', 364 'OsResetReq', 365 'OsResetRsp', 366 'OsRestartReq', 367 'OsRestartRsp', 368 'OsReadTrConfReq', 369 'OsReadTrConfRsp', 370 'OsWriteTrConfReq', 371 'OsWriteTrConfRsp', 372 'OsWriteTrConfByteReq', 373 'OsWriteTrConfByteRsp', 374 'OsRfpgmReq', 375 'OsRfpgmRsp', 376 'OsSleepReq', 377 'OsSleepRsp', 378 'OsSetSecurityReq', 379 'OsSetSecurityRsp', 380 'OsBatchReq', 381 'OsBatchRsp', 382 'OsSelectiveBatchReq', 383 'OsSelectiveBatchRsp', 384 'OsIndicateReq', 385 'OsIndicateRsp', 386 'OsFactorySettingsReq', 387 'OsFactorySettingsRsp', 388 'OsTestRfSignalReq', 389 'OsTestRfSignalRsp', 390 'OsLoadCodeReq', 391 'OsLoadCodeRsp', 392 # .peripherals.ram 393 'RamReadReq', 394 'RamReadRsp', 395 'RamWriteReq', 396 'RamWriteRsp', 397 'RamReadAnyReq', 398 'RamReadAnyRsp', 399 # .peripherals.thermometer 400 'ThermometerReadReq', 401 'ThermometerReadRsp', 402 # .peripherals.uart 403 'UartOpenReq', 404 'UartOpenRsp', 405 'UartCloseReq', 406 'UartCloseRsp', 407 'UartWriteReadReq', 408 'UartWriteReadRsp', 409 'UartClearWriteReadReq', 410 'UartClearWriteReadRsp', 411 # .peripherals.binaryoutput 412 'BinaryOutputEnumerateReq', 413 'BinaryOutputEnumerateRsp', 414 'BinaryOutputSetOutputReq', 415 'BinaryOutputSetOutputRsp', 416 # .peripherals.sensor 417 'SensorEnumerateReq', 418 'SensorEnumerateRsp', 419 'SensorReadSensorsReq', 420 'SensorReadSensorsRsp', 421 'SensorReadWithTypesReq', 422 'SensorReadWithTypesRsp', 423 # .objects 424 'BinaryOutputState', 425 'CoordinatorAuthorizeBondParams', 426 'CoordinatorDpaParam', 427 'ExplorationPerEnumData', 428 'ExplorationPerInfoData', 429 'IoTriplet', 430 'NodeReadData', 431 'NodeValidateBondsParams', 432 'OsBatchData', 433 'OsIndicateParam', 434 'OsLoadCodeFlags', 435 'OsReadData', 436 'OsSecurityTypeParam', 437 'OsSleepParams', 438 'OsTrConfByte', 439 'OsTrConfData', 440 'SensorWrittenData', 441 # .utils.sensor_data 442 'SensorData', 443)
17class IRequest(ABC): 18 """This abstract class serves as an interface for embedded peripherals and standards request messages. 19 20 The class provides getters and setters for shared request properties in the form of the @property decorator. 21 Shared properties such as device address, peripheral, peripheral command and hwpid are validated here. 22 The class also provides generic DPA and JSON serialization methods for derived classes to amend. 23 While the abstract class cannot be instantiated, it's methods can be called from a derived class using super. 24 25 Network address (nadr) specifies target device: 0 (coordinator), 1-239 (node), 252 (local device), 26 254 (temporary address) and 255 (broadcast), other values are reserved. Values in range 0-255. 27 28 Peripheral number (pnum) specifies peripheral to use: 0-31 are served for embedded peripherals and 29 64-127 are reserved for IQRF standard peripherals. Peripheral number can be passed as a member of one 30 of the peripheral enum class members (see iqrfpy.enums.peripherals) or plain integer. Values in range 0-255. 31 32 Peripheral command (pcmd) specifies a command to execute, each peripheral implements different commands. 33 See DPA framework document for specific command numbers. Peripheral command can be passed as a member of one 34 of the peripheral command enum class members (see iqrfpy.enums.commands) or plain integer. 35 Peripheral number and peripheral command are pre-defined for each command message this library implements, 36 however they will be required for user-defined messages. Values in range 0-255, but note that only values 37 0-127 are used for request commands, as response command value is equal to request command value plus 128 (0x80). 38 39 Hardware profile ID (hwpid) specifies a unique device (by its functionality), this argument can be used to filter 40 which devices execute a commend specified by request message. Only devices with matching hwpid will execute 41 a command, unless the value 65535 (0xFFFF) is used, which ignores hwpid checking. Values in range 0-65535. 42 43 Packet data (pdata) can be required or optional data sent with the request message, depending on the command. 44 On a general level, packet data is treated as list of unsigned 1B integers. This means that when defining a user 45 command and a request message, request data should be serialized into a list of previously mentioned 46 format and stored in the pdata member. 47 48 For more detailed information, see the IQRF DPA Framework documentation. 49 50 Message type (m_type) is an optional argument, however, it is required when using JSON API as message type 51 is equivalent to a combination of peripheral number and peripheral command (see iqrfpy.enums.message_types). 52 For example, `OSMessages.READ` (iqrfEmbedOs_Read) is equivalent to `EmbedPeripherals.OS` (2) and 53 `OSRequestCommands.READ` (0). 54 55 Request parameters (params) is an optional dictionary argument, that is used when constructing a JSON API request. 56 When creating a user message, use the params member to propagate your request parameters into the final 57 JSON request object. 58 59 DPA response time (dpa_rsp_time) is an optional argument specifying DPA request timeout, not time between reception 60 of a request and delivery of response. Note that handling of this argument is the responsibility of a transport 61 and system facilitating communication between the library and devices of the IQRF network. 62 63 Device processing time (dev_process_time) is an optional argument used to specify time allotted to the device 64 executing a task beyond simple data processing. For example, a device may need to measure some quantity 65 for certain amount of time for the measured value to be valid. Note that handling of this argument 66 is the responsibility of a transport and system facilitating communication between the library and devices 67 of the IQRF network. 68 69 Message ID (msgid) is an optional argument used only by the JSON API at this time. It is used to pair 70 JSON API requests and responses as the message type argument only allows to pair requests of the same type, 71 but not pairing a specific request and response, given the asynchronous nature of the JSON API. 72 """ 73 74 __slots__ = '_nadr', '_pnum', '_pcmd', '_mtype', '_hwpid', '_pdata', '_msgid', '_params', '_dpa_rsp_time', \ 75 '_dev_process_time' 76 77 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 78 hwpid: int = dpa_constants.HWPID_MAX, pdata: Optional[List[int]] = None, 79 m_type: Optional[MessageType] = None, msgid: Optional[str] = None, 80 params: Optional[dict] = None, dpa_rsp_time: Optional[float] = None, 81 dev_process_time: Optional[float] = None): 82 """IRequest constructor. 83 84 Args: 85 nadr (int): Network address. 86 pnum (Union[Peripheral, int]): Peripheral number. 87 pcmd (Union[Command, int]): Peripheral command. 88 hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check). 89 pdata (List[int], optional): DPA request data. Defaults to None. 90 m_type (MessageType, optional): JSON API message type. Defaults to None. 91 msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random 92 UUIDv4 string is generated and used. 93 params (dict, optional): JSON API request data. Defaults to None. 94 dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None. 95 dev_process_time (float, optional): Device processing time. Defaults to None. 96 """ 97 self._nadr = nadr 98 self._pnum = pnum 99 self._pcmd = pcmd 100 self._hwpid = hwpid 101 self._pdata = pdata 102 self._mtype = m_type 103 self._msgid = msgid if msgid is not None else str(uuid4()) 104 self._params = params if params is not None else {} 105 self._dpa_rsp_time = dpa_rsp_time 106 self._dev_process_time = dev_process_time 107 self._validate_base(nadr, pnum, pcmd, hwpid, dpa_rsp_time, dev_process_time) 108 109 def _validate_base(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int, 110 dpa_rsp_time: Optional[float], dev_process_time: Optional[float]) -> None: 111 """Shared members validation method. 112 113 Validates shared request members to ensure the data can fit into the DPA packet and JSON request 114 object and do not violate rules for expected format of these properties. 115 116 Args: 117 nadr (int): Network address. 118 pnum (Union[Peripheral, int]): Peripheral number. 119 pcmd (Union[Command, int]): Peripheral command. 120 hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check). 121 dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None. 122 dev_process_time (float, optional): Device processing time. Defaults to None. 123 """ 124 self._validate_nadr(nadr) 125 self._validate_pnum(pnum) 126 self._validate_pcmd(pcmd) 127 self._validate_hwpid(hwpid) 128 self._validate_dpa_rsp_time(dpa_rsp_time) 129 self._validate_dev_process_time(dev_process_time) 130 131 @staticmethod 132 def _validate_nadr(nadr: int): 133 """Validates network address parameter. 134 135 Args: 136 nadr (int): Network address value. 137 138 Raises: 139 RequestNadrInvalidError: If nadr is less than 0 or greater than 255. 140 """ 141 if not dpa_constants.BYTE_MIN <= nadr <= dpa_constants.BYTE_MAX: 142 raise RequestNadrInvalidError('NADR should be between 0 and 255.') 143 144 @property 145 def nadr(self) -> int: 146 """:obj:`int`: Network address. 147 148 Getter and setter. 149 """ 150 return self._nadr 151 152 @nadr.setter 153 def nadr(self, value: int): 154 self._validate_nadr(value) 155 self._nadr = value 156 157 @staticmethod 158 def _validate_pnum(pnum: Union[Peripheral, int]): 159 """Validates peripheral number parameter. 160 161 Args: 162 pnum (Union[Peripheral, int]): Peripheral number value. 163 164 Raises: 165 RequestPnumInvalidError: If pnum is less than 0 or greater than 255. 166 """ 167 if not dpa_constants.BYTE_MIN <= pnum <= dpa_constants.BYTE_MAX: 168 raise RequestPnumInvalidError('PNUM should be between 0 and 255.') 169 170 @property 171 def pnum(self) -> Union[Peripheral, int]: 172 """:obj:`iqrfpy.enums.peripherals.Peripheral` of :obj:`int`: Peripheral number. 173 174 Getter and setter. 175 """ 176 return self._pnum 177 178 @pnum.setter 179 def pnum(self, value: Union[Peripheral, int]): 180 self._validate_pnum(pnum=value) 181 self._pnum = value 182 183 @staticmethod 184 def _validate_pcmd(pcmd: Union[Command, int]): 185 """Validates peripheral command parameter. 186 187 Args: 188 pcmd (Union[Command, int]): Peripheral command value. 189 190 Raises: 191 RequestPcmdInvalidError: If pcmd is less than 0 or greater than 255. 192 """ 193 if not dpa_constants.BYTE_MIN <= pcmd <= dpa_constants.BYTE_MAX: 194 raise RequestPcmdInvalidError('PCMD should be between 0 and 255.') 195 196 @property 197 def pcmd(self) -> Union[Command, int]: 198 """:obj:`iqrfpy.enums.commands.Command` or :obj:`int`: Peripheral command. 199 200 Getter and setter. 201 """ 202 return self._pcmd 203 204 @pcmd.setter 205 def pcmd(self, value: Union[Command, int]): 206 self._validate_pcmd(pcmd=value) 207 self._pcmd = value 208 209 @staticmethod 210 def _validate_hwpid(hwpid: int): 211 """Validates Hardware profile ID parameter. 212 213 Args: 214 hwpid (int): Hardware profile ID value. 215 216 Raises: 217 RequestHwpidInvalidError: If hwpid is less than 0 or greater than 65535. 218 """ 219 if not dpa_constants.HWPID_MIN <= hwpid <= dpa_constants.HWPID_MAX: 220 raise RequestHwpidInvalidError('HWPID should be between 0 and 65535.') 221 222 @property 223 def hwpid(self) -> int: 224 """:obj:`int`: Hardware profile ID. 225 226 Getter and setter. 227 """ 228 return self._hwpid 229 230 @staticmethod 231 def _validate_dpa_rsp_time(dpa_rsp_time: Optional[float] = None): 232 """Validates dpa timeout parameter. 233 234 Args: 235 dpa_rsp_time (float, optional): DPA timeout value in seconds. Defaults to None. 236 237 Raises: 238 RequestParameterInvalidValueError: If dpa_rsp_time is not None and is less than 0. 239 """ 240 if dpa_rsp_time is None: 241 return 242 if dpa_rsp_time < 0: 243 raise RequestParameterInvalidValueError('DPA response time should a positive integer.') 244 245 @property 246 def dpa_rsp_time(self) -> Optional[float]: 247 """:obj:`float` or :obj:`None`: DPA timeout. 248 249 Getter and setter. 250 """ 251 return self._dpa_rsp_time 252 253 @dpa_rsp_time.setter 254 def dpa_rsp_time(self, value: Optional[float] = None): 255 self._validate_dpa_rsp_time(value) 256 self._dpa_rsp_time = value 257 258 @staticmethod 259 def _validate_dev_process_time(dev_process_time: Optional[float] = None): 260 """Validates device processing time parameter. 261 262 Args: 263 dev_process_time (float, optional): Device processing time value. Defaults to None. 264 265 Raises: 266 RequestParameterInvalidValueError: If dev_process_time is not None and is less than 0. 267 """ 268 if dev_process_time is None: 269 return 270 if dev_process_time < 0: 271 raise RequestParameterInvalidValueError('Device processing time should be a positive integer.') 272 273 @property 274 def dev_process_time(self) -> Optional[float]: 275 """:obj:`float` or :obj:`None`: Device processing time. 276 277 Getter and setter. 278 """ 279 return self._dev_process_time 280 281 @dev_process_time.setter 282 def dev_process_time(self, value: Optional[float] = None): 283 self._validate_dev_process_time(value) 284 self._dev_process_time = value 285 286 @property 287 def msgid(self) -> str: 288 """:obj:`str`: Message ID. 289 290 Getter and setter. 291 """ 292 return self._msgid 293 294 @msgid.setter 295 def msgid(self, value: str): 296 self._msgid = value 297 298 @property 299 def mtype(self) -> MessageType: 300 """:obj:`MessageType` Message type. 301 302 Getter only. 303 """ 304 return self._mtype 305 306 @abstractmethod 307 def to_dpa(self, mutable: bool = False) -> Union[bytes, bytearray]: 308 """DPA request serialization method. 309 310 Serializes a request to DPA request packet format. Each request contains the DPA request header. 311 If pdata is specified, it is appended to the DPA request header. The request can be serialized 312 into both mutable and immutable (default) byte formats. 313 314 Args: 315 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 316 Defaults to False. 317 318 Returns: 319 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 320 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 321 """ 322 raise NotImplementedError('Abstract method not implemented.') 323 324 @abstractmethod 325 def to_json(self) -> dict: 326 """JSON API request serialization method. 327 328 Serializes a request to JSON API request object. Each request contains message type and data. 329 Data carries message ID for pairing requests with responses, and `req` object that contains network address, 330 hwpid and optional request parameters (equivalent to DPA packet data) if `params` is not None. 331 If DPA timeout is specified, the value is multiplied by 1000 as JSON API accepts timeout in milliseconds. 332 333 Returns: 334 :obj:`dict`: JSON API request object. 335 """ 336 raise NotImplementedError('Abstract method not implemented')
This abstract class serves as an interface for embedded peripherals and standards request messages.
The class provides getters and setters for shared request properties in the form of the @property decorator. Shared properties such as device address, peripheral, peripheral command and hwpid are validated here. The class also provides generic DPA and JSON serialization methods for derived classes to amend. While the abstract class cannot be instantiated, it's methods can be called from a derived class using super.
Network address (nadr) specifies target device: 0 (coordinator), 1-239 (node), 252 (local device), 254 (temporary address) and 255 (broadcast), other values are reserved. Values in range 0-255.
Peripheral number (pnum) specifies peripheral to use: 0-31 are served for embedded peripherals and 64-127 are reserved for IQRF standard peripherals. Peripheral number can be passed as a member of one of the peripheral enum class members (see iqrfpy.enums.peripherals) or plain integer. Values in range 0-255.
Peripheral command (pcmd) specifies a command to execute, each peripheral implements different commands. See DPA framework document for specific command numbers. Peripheral command can be passed as a member of one of the peripheral command enum class members (see iqrfpy.enums.commands) or plain integer. Peripheral number and peripheral command are pre-defined for each command message this library implements, however they will be required for user-defined messages. Values in range 0-255, but note that only values 0-127 are used for request commands, as response command value is equal to request command value plus 128 (0x80).
Hardware profile ID (hwpid) specifies a unique device (by its functionality), this argument can be used to filter which devices execute a commend specified by request message. Only devices with matching hwpid will execute a command, unless the value 65535 (0xFFFF) is used, which ignores hwpid checking. Values in range 0-65535.
Packet data (pdata) can be required or optional data sent with the request message, depending on the command. On a general level, packet data is treated as list of unsigned 1B integers. This means that when defining a user command and a request message, request data should be serialized into a list of previously mentioned format and stored in the pdata member.
For more detailed information, see the IQRF DPA Framework documentation.
Message type (m_type) is an optional argument, however, it is required when using JSON API as message type
is equivalent to a combination of peripheral number and peripheral command (see iqrfpy.enums.message_types).
For example, OSMessages.READ
(iqrfEmbedOs_Read) is equivalent to EmbedPeripherals.OS
(2) and
OSRequestCommands.READ
(0).
Request parameters (params) is an optional dictionary argument, that is used when constructing a JSON API request. When creating a user message, use the params member to propagate your request parameters into the final JSON request object.
DPA response time (dpa_rsp_time) is an optional argument specifying DPA request timeout, not time between reception of a request and delivery of response. Note that handling of this argument is the responsibility of a transport and system facilitating communication between the library and devices of the IQRF network.
Device processing time (dev_process_time) is an optional argument used to specify time allotted to the device executing a task beyond simple data processing. For example, a device may need to measure some quantity for certain amount of time for the measured value to be valid. Note that handling of this argument is the responsibility of a transport and system facilitating communication between the library and devices of the IQRF network.
Message ID (msgid) is an optional argument used only by the JSON API at this time. It is used to pair JSON API requests and responses as the message type argument only allows to pair requests of the same type, but not pairing a specific request and response, given the asynchronous nature of the JSON API.
77 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 78 hwpid: int = dpa_constants.HWPID_MAX, pdata: Optional[List[int]] = None, 79 m_type: Optional[MessageType] = None, msgid: Optional[str] = None, 80 params: Optional[dict] = None, dpa_rsp_time: Optional[float] = None, 81 dev_process_time: Optional[float] = None): 82 """IRequest constructor. 83 84 Args: 85 nadr (int): Network address. 86 pnum (Union[Peripheral, int]): Peripheral number. 87 pcmd (Union[Command, int]): Peripheral command. 88 hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check). 89 pdata (List[int], optional): DPA request data. Defaults to None. 90 m_type (MessageType, optional): JSON API message type. Defaults to None. 91 msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random 92 UUIDv4 string is generated and used. 93 params (dict, optional): JSON API request data. Defaults to None. 94 dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None. 95 dev_process_time (float, optional): Device processing time. Defaults to None. 96 """ 97 self._nadr = nadr 98 self._pnum = pnum 99 self._pcmd = pcmd 100 self._hwpid = hwpid 101 self._pdata = pdata 102 self._mtype = m_type 103 self._msgid = msgid if msgid is not None else str(uuid4()) 104 self._params = params if params is not None else {} 105 self._dpa_rsp_time = dpa_rsp_time 106 self._dev_process_time = dev_process_time 107 self._validate_base(nadr, pnum, pcmd, hwpid, dpa_rsp_time, dev_process_time)
IRequest constructor.
Arguments:
- nadr (int): Network address.
- pnum (Union[Peripheral, int]): Peripheral number.
- pcmd (Union[Command, int]): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check).
- pdata (List[int], optional): DPA request data. Defaults to None.
- m_type (MessageType, optional): JSON API message type. Defaults to None.
- msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random UUIDv4 string is generated and used.
- params (dict, optional): JSON API request data. Defaults to None.
- dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None.
- dev_process_time (float, optional): Device processing time. Defaults to None.
iqrfpy.enums.peripherals.Peripheral
of int
: Peripheral number.
Getter and setter.
iqrfpy.enums.commands.Command
or int
: Peripheral command.
Getter and setter.
306 @abstractmethod 307 def to_dpa(self, mutable: bool = False) -> Union[bytes, bytearray]: 308 """DPA request serialization method. 309 310 Serializes a request to DPA request packet format. Each request contains the DPA request header. 311 If pdata is specified, it is appended to the DPA request header. The request can be serialized 312 into both mutable and immutable (default) byte formats. 313 314 Args: 315 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 316 Defaults to False. 317 318 Returns: 319 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 320 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 321 """ 322 raise NotImplementedError('Abstract method not implemented.')
DPA request serialization method.
Serializes a request to DPA request packet format. Each request contains the DPA request header. If pdata is specified, it is appended to the DPA request header. The request can be serialized into both mutable and immutable (default) byte formats.
Arguments:
- mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. Defaults to False.
Returns:
bytes
: Immutable byte representation of DPA request packet.
bytearray
: Mutable byte representation of DPA request packet (if argument mutable is True).
324 @abstractmethod 325 def to_json(self) -> dict: 326 """JSON API request serialization method. 327 328 Serializes a request to JSON API request object. Each request contains message type and data. 329 Data carries message ID for pairing requests with responses, and `req` object that contains network address, 330 hwpid and optional request parameters (equivalent to DPA packet data) if `params` is not None. 331 If DPA timeout is specified, the value is multiplied by 1000 as JSON API accepts timeout in milliseconds. 332 333 Returns: 334 :obj:`dict`: JSON API request object. 335 """ 336 raise NotImplementedError('Abstract method not implemented')
JSON API request serialization method.
Serializes a request to JSON API request object. Each request contains message type and data.
Data carries message ID for pairing requests with responses, and req
object that contains network address,
hwpid and optional request parameters (equivalent to DPA packet data) if params
is not None.
If DPA timeout is specified, the value is multiplied by 1000 as JSON API accepts timeout in milliseconds.
Returns:
dict
: JSON API request object.
15class IResponse(ABC): 16 """This class serves as an interface for embedded peripherals and standards response messages. 17 18 This class does not provide any deserialization logic in its factory methods as they simply serve as abstracts. 19 20 Once a factory method is implemented, it should be used to parse responses and create response message objects. 21 """ 22 23 ASYNC_MSGID = 'async' 24 25 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 26 hwpid: int = dpa_constants.HWPID_MAX, rcode: int = 0, dpa_value: int = 0, 27 pdata: Optional[List[int]] = None, m_type: Optional[Union[MessageType, str]] = None, 28 msgid: Optional[str] = None, result: Optional[dict] = None): 29 self._nadr = nadr 30 self._pnum = pnum 31 self._pcmd = pcmd 32 self._mtype = m_type 33 self._hwpid = hwpid 34 self._rcode = rcode 35 self._dpa_value = dpa_value 36 self._pdata = pdata 37 self._msgid = msgid 38 self._result = result 39 40 @property 41 @abstractmethod 42 def nadr(self) -> int: 43 """:obj:`int`: Device address. 44 45 Getter only. 46 """ 47 return self._nadr 48 49 @property 50 @abstractmethod 51 def pnum(self) -> Union[Peripheral, int]: 52 """:obj:`Peripheral` or :obj:`int`: Peripheral number. 53 54 Getter only. 55 """ 56 return self._pnum 57 58 @property 59 @abstractmethod 60 def pcmd(self) -> Union[Command, int]: 61 """:obj:`Command` or :obj:`int`: Peripheral command. 62 63 Getter only. 64 """ 65 return self._pcmd 66 67 @property 68 @abstractmethod 69 def mtype(self) -> Optional[Union[MessageType, str]]: 70 """:obj:`MessageType` or :obj:`str` or :obj:`None`: Message type. 71 72 Getter only. 73 """ 74 return self._mtype 75 76 @property 77 @abstractmethod 78 def hwpid(self) -> int: 79 """:obj:`int`: Hardware profile ID. 80 81 Getter only. 82 """ 83 return self._hwpid 84 85 @property 86 @abstractmethod 87 def rcode(self) -> int: 88 """:obj:`int`: DPA error code. 89 90 Getter only. 91 """ 92 return self._rcode 93 94 @property 95 @abstractmethod 96 def dpa_value(self) -> int: 97 """:obj:`int`: DPA value. 98 99 Getter only. 100 """ 101 return self._dpa_value 102 103 @property 104 @abstractmethod 105 def pdata(self) -> Optional[List[int]]: 106 """:obj:`list` of :obj:`int`: DPA response data. 107 108 Getter only. 109 """ 110 return self._pdata 111 112 @property 113 @abstractmethod 114 def result(self) -> Optional[dict]: 115 """:obj:`dict` or :obj:`None`: JSON API response data. 116 117 Getter only. 118 """ 119 return self._result 120 121 @property 122 @abstractmethod 123 def msgid(self) -> Optional[str]: 124 """:obj:`str` or :obj:`None`: Message ID. 125 126 Getter only. 127 """ 128 return self._msgid 129 130 @staticmethod 131 def validate_dpa_response(data: bytes) -> None: 132 """Validate DPA response for base response length. 133 134 Args: 135 data (bytes): DPA response. 136 """ 137 DpaValidator.base_response_length(data) 138 139 @classmethod 140 @abstractmethod 141 def from_dpa(cls, dpa: bytes) -> 'IResponse': 142 """Factory method. Parse DPA response into Response object. 143 144 Args: 145 dpa (bytes): DPA response. 146 """ 147 148 @classmethod 149 @abstractmethod 150 def from_json(cls, json: dict) -> 'IResponse': 151 """Factory method. Parse JSON response into Response object. 152 153 Args: 154 json (dict): JSON API response. 155 """
This class serves as an interface for embedded peripherals and standards response messages.
This class does not provide any deserialization logic in its factory methods as they simply serve as abstracts.
Once a factory method is implemented, it should be used to parse responses and create response message objects.
Peripheral
or int
: Peripheral number.
Getter only.
MessageType
or str
or None
: Message type.
Getter only.
130 @staticmethod 131 def validate_dpa_response(data: bytes) -> None: 132 """Validate DPA response for base response length. 133 134 Args: 135 data (bytes): DPA response. 136 """ 137 DpaValidator.base_response_length(data)
Validate DPA response for base response length.
Arguments:
- data (bytes): DPA response.
139 @classmethod 140 @abstractmethod 141 def from_dpa(cls, dpa: bytes) -> 'IResponse': 142 """Factory method. Parse DPA response into Response object. 143 144 Args: 145 dpa (bytes): DPA response. 146 """
Factory method. Parse DPA response into Response object.
Arguments:
- dpa (bytes): DPA response.
148 @classmethod 149 @abstractmethod 150 def from_json(cls, json: dict) -> 'IResponse': 151 """Factory method. Parse JSON response into Response object. 152 153 Args: 154 json (dict): JSON API response. 155 """
Factory method. Parse JSON response into Response object.
Arguments:
- json (dict): JSON API response.
20class AsyncResponse(IResponseGetterMixin): 21 """Asynchronous response class. 22 23 This class is used to capture and resolve asynchronous responses, 24 as this is a generic response class, the data is only sent as string of bytes in hexadecimal 25 representation separated by dots. 26 """ 27 28 def __init__(self, nadr: int, pnum: Peripheral, pcmd: Command, hwpid: int = dpa_constants.HWPID_MAX, 29 rcode: int = 0x80, dpa_value: int = 0, pdata: Union[List[int], None] = None, 30 msgid: Optional[str] = None, result: Optional[dict] = None): 31 """AsyncResponse constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Peripheral): Peripheral. 36 pcmd (Command): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 msgid (str, optional): Message ID. Defaults to None. 42 result (dict, optional): JSON response data. Defaults to None. 43 """ 44 super().__init__( 45 nadr=nadr, 46 pnum=pnum, 47 pcmd=pcmd, 48 m_type=GenericMessages.RAW, 49 hwpid=hwpid, 50 rcode=rcode, 51 dpa_value=dpa_value, 52 pdata=pdata, 53 msgid=msgid, 54 result=result 55 ) 56 57 @classmethod 58 def from_dpa(cls, dpa: bytes) -> 'AsyncResponse': 59 """Asynchronous response DPA factory method. 60 61 Parses DPA data and constructs :obj:`AsyncResponse` object. 62 63 Args: 64 dpa (bytes): DPA response bytes. 65 66 Returns: 67 :obj:`AsyncResponse`: Asynchronous response message object. 68 """ 69 DpaValidator.base_response_length(dpa=dpa) 70 nadr = dpa[ResponsePacketMembers.NADR] 71 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 72 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 73 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 74 rcode = dpa[ResponsePacketMembers.RCODE] 75 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 76 result = None 77 if rcode == ResponseCodes.ASYNC_RESPONSE: 78 if len(dpa) > 8: 79 result = {'rData': list(dpa)} 80 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 81 pdata=list(dpa), result=result) 82 83 @classmethod 84 def from_json(cls, json: dict) -> 'AsyncResponse': 85 """Asynchronous response JSON factory method. 86 87 Parses JSON API response and constructs :obj:`AsyncResponse` object. 88 89 Args: 90 json (dict): JSON API response. 91 92 Returns: 93 :obj:`AsyncResponse`: Asynchronous response message object. 94 """ 95 msgid = Common.msgid_from_json(json) 96 result = json['data']['rsp'] 97 packet = result['rData'].replace('.', '') 98 pdata = bytes.fromhex(packet) 99 ldata = Common.hex_string_to_list(packet) 100 nadr = ldata[ResponsePacketMembers.NADR] 101 hwpid = Common.hwpid_from_dpa(ldata[ResponsePacketMembers.HWPID_HI], ldata[ResponsePacketMembers.HWPID_LO]) 102 pnum = Common.pnum_from_dpa(ldata[ResponsePacketMembers.PNUM]) 103 pcmd = Common.request_pcmd_from_dpa(pnum, ldata[ResponsePacketMembers.PCMD]) 104 rcode = ldata[ResponsePacketMembers.RCODE] 105 dpa_value = ldata[ResponsePacketMembers.DPA_VALUE] 106 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 107 pdata=list(pdata), msgid=msgid, result=result)
Asynchronous response class.
This class is used to capture and resolve asynchronous responses, as this is a generic response class, the data is only sent as string of bytes in hexadecimal representation separated by dots.
28 def __init__(self, nadr: int, pnum: Peripheral, pcmd: Command, hwpid: int = dpa_constants.HWPID_MAX, 29 rcode: int = 0x80, dpa_value: int = 0, pdata: Union[List[int], None] = None, 30 msgid: Optional[str] = None, result: Optional[dict] = None): 31 """AsyncResponse constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Peripheral): Peripheral. 36 pcmd (Command): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 msgid (str, optional): Message ID. Defaults to None. 42 result (dict, optional): JSON response data. Defaults to None. 43 """ 44 super().__init__( 45 nadr=nadr, 46 pnum=pnum, 47 pcmd=pcmd, 48 m_type=GenericMessages.RAW, 49 hwpid=hwpid, 50 rcode=rcode, 51 dpa_value=dpa_value, 52 pdata=pdata, 53 msgid=msgid, 54 result=result 55 )
AsyncResponse constructor.
Arguments:
- nadr (int): Device address.
- pnum (Peripheral): Peripheral.
- pcmd (Command): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check.
- rcode (int, optional): Response code. Defaults to 128.
- dpa_value (int, optional): DPA value. Defaults to 0.
- pdata (List[int], optional): DPA response data. Defaults to None.
- msgid (str, optional): Message ID. Defaults to None.
- result (dict, optional): JSON response data. Defaults to None.
57 @classmethod 58 def from_dpa(cls, dpa: bytes) -> 'AsyncResponse': 59 """Asynchronous response DPA factory method. 60 61 Parses DPA data and constructs :obj:`AsyncResponse` object. 62 63 Args: 64 dpa (bytes): DPA response bytes. 65 66 Returns: 67 :obj:`AsyncResponse`: Asynchronous response message object. 68 """ 69 DpaValidator.base_response_length(dpa=dpa) 70 nadr = dpa[ResponsePacketMembers.NADR] 71 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 72 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 73 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 74 rcode = dpa[ResponsePacketMembers.RCODE] 75 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 76 result = None 77 if rcode == ResponseCodes.ASYNC_RESPONSE: 78 if len(dpa) > 8: 79 result = {'rData': list(dpa)} 80 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 81 pdata=list(dpa), result=result)
Asynchronous response DPA factory method.
Parses DPA data and constructs AsyncResponse
object.
Arguments:
- dpa (bytes): DPA response bytes.
Returns:
AsyncResponse
: Asynchronous response message object.
83 @classmethod 84 def from_json(cls, json: dict) -> 'AsyncResponse': 85 """Asynchronous response JSON factory method. 86 87 Parses JSON API response and constructs :obj:`AsyncResponse` object. 88 89 Args: 90 json (dict): JSON API response. 91 92 Returns: 93 :obj:`AsyncResponse`: Asynchronous response message object. 94 """ 95 msgid = Common.msgid_from_json(json) 96 result = json['data']['rsp'] 97 packet = result['rData'].replace('.', '') 98 pdata = bytes.fromhex(packet) 99 ldata = Common.hex_string_to_list(packet) 100 nadr = ldata[ResponsePacketMembers.NADR] 101 hwpid = Common.hwpid_from_dpa(ldata[ResponsePacketMembers.HWPID_HI], ldata[ResponsePacketMembers.HWPID_LO]) 102 pnum = Common.pnum_from_dpa(ldata[ResponsePacketMembers.PNUM]) 103 pcmd = Common.request_pcmd_from_dpa(pnum, ldata[ResponsePacketMembers.PCMD]) 104 rcode = ldata[ResponsePacketMembers.RCODE] 105 dpa_value = ldata[ResponsePacketMembers.DPA_VALUE] 106 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 107 pdata=list(pdata), msgid=msgid, result=result)
Asynchronous response JSON factory method.
Parses JSON API response and constructs AsyncResponse
object.
Arguments:
- json (dict): JSON API response.
Returns:
AsyncResponse
: Asynchronous response message object.
18class Confirmation(IResponseGetterMixin): 19 """Confirmation message class. 20 21 DPA confirmation is used to confirm reception of DPA request by node device at coordinator. 22 The message carries DPA value, request hops (number of hops used to deliver DPA request to node device), 23 response hops (number of hops used to deliver DPA response from node device to coordinator), 24 and timeslot (see DPA documentation for timeslot length calculation). 25 """ 26 27 __slots__ = '_request_hops', '_response_hops', '_timeslot' 28 29 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int, dpa_value: int, 30 rcode: int, pdata: Optional[List[int]] = None, result: Optional[dict] = None): 31 """Confirmation constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Union[Peripheral, int]): Peripheral. 36 pcmd (Union[Command, int]): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 result (dict, optional): JSON response data. Defaults to None. 42 """ 43 super().__init__( 44 nadr=nadr, 45 pcmd=pcmd, 46 pnum=pnum, 47 hwpid=hwpid, 48 rcode=rcode, 49 dpa_value=dpa_value, 50 pdata=pdata, 51 result=result 52 ) 53 self._request_hops: int = result['requestHops'] 54 self._response_hops: int = result['responseHops'] 55 self._timeslot: int = result['timeslot'] 56 57 @property 58 def request_hops(self) -> int: 59 """:obj:`int`: Request hops. 60 61 Getter only. 62 """ 63 return self._request_hops 64 65 @property 66 def response_hops(self) -> int: 67 """:obj:`int`: Response hops. 68 69 Getter only. 70 """ 71 return self._response_hops 72 73 @property 74 def timeslot(self) -> int: 75 """:obj:`int`: Timeslot. 76 77 Getter only. 78 """ 79 return self._timeslot 80 81 @classmethod 82 def from_dpa(cls, dpa: bytes) -> 'Confirmation': 83 """Confirmation DPA factory method. 84 85 Parses DPA confirmation message and constructs :obj:`Confirmation` object. 86 87 Args: 88 dpa (bytes): DPA confirmation bytes. 89 90 Returns: 91 :obj:`Confirmation`: Confirmation message object. 92 """ 93 DpaValidator.confirmation_length(dpa=dpa) 94 DpaValidator.confirmation_code(dpa=dpa) 95 nadr = dpa[ResponsePacketMembers.NADR] 96 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 97 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 98 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 99 rcode = dpa[ResponsePacketMembers.RCODE] 100 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 101 pdata = list(dpa[8:]) 102 result = {'requestHops': dpa[8], 'responseHops': dpa[10], 'timeslot': dpa[9]} 103 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 104 pdata=pdata, result=result) 105 106 @classmethod 107 def from_json(cls, json: dict) -> 'Confirmation': 108 """Confirmation JSON factory method. 109 110 This method is not implemented as JSON API does not support standalone Confirmation messages. 111 112 Raises: 113 NotImplementedError: This method is not implemented. 114 """ 115 raise NotImplementedError('from_json() method not implemented.')
Confirmation message class.
DPA confirmation is used to confirm reception of DPA request by node device at coordinator. The message carries DPA value, request hops (number of hops used to deliver DPA request to node device), response hops (number of hops used to deliver DPA response from node device to coordinator), and timeslot (see DPA documentation for timeslot length calculation).
29 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int, dpa_value: int, 30 rcode: int, pdata: Optional[List[int]] = None, result: Optional[dict] = None): 31 """Confirmation constructor. 32 33 Args: 34 nadr (int): Device address. 35 pnum (Union[Peripheral, int]): Peripheral. 36 pcmd (Union[Command, int]): Peripheral command. 37 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 38 rcode (int, optional): Response code. Defaults to 128. 39 dpa_value (int, optional): DPA value. Defaults to 0. 40 pdata (List[int], optional): DPA response data. Defaults to None. 41 result (dict, optional): JSON response data. Defaults to None. 42 """ 43 super().__init__( 44 nadr=nadr, 45 pcmd=pcmd, 46 pnum=pnum, 47 hwpid=hwpid, 48 rcode=rcode, 49 dpa_value=dpa_value, 50 pdata=pdata, 51 result=result 52 ) 53 self._request_hops: int = result['requestHops'] 54 self._response_hops: int = result['responseHops'] 55 self._timeslot: int = result['timeslot']
Confirmation constructor.
Arguments:
- nadr (int): Device address.
- pnum (Union[Peripheral, int]): Peripheral.
- pcmd (Union[Command, int]): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check.
- rcode (int, optional): Response code. Defaults to 128.
- dpa_value (int, optional): DPA value. Defaults to 0.
- pdata (List[int], optional): DPA response data. Defaults to None.
- result (dict, optional): JSON response data. Defaults to None.
81 @classmethod 82 def from_dpa(cls, dpa: bytes) -> 'Confirmation': 83 """Confirmation DPA factory method. 84 85 Parses DPA confirmation message and constructs :obj:`Confirmation` object. 86 87 Args: 88 dpa (bytes): DPA confirmation bytes. 89 90 Returns: 91 :obj:`Confirmation`: Confirmation message object. 92 """ 93 DpaValidator.confirmation_length(dpa=dpa) 94 DpaValidator.confirmation_code(dpa=dpa) 95 nadr = dpa[ResponsePacketMembers.NADR] 96 pnum = Common.pnum_from_dpa(dpa[ResponsePacketMembers.PNUM]) 97 pcmd = Common.request_pcmd_from_dpa(pnum, dpa[ResponsePacketMembers.PCMD]) 98 hwpid = Common.hwpid_from_dpa(dpa[ResponsePacketMembers.HWPID_HI], dpa[ResponsePacketMembers.HWPID_LO]) 99 rcode = dpa[ResponsePacketMembers.RCODE] 100 dpa_value = dpa[ResponsePacketMembers.DPA_VALUE] 101 pdata = list(dpa[8:]) 102 result = {'requestHops': dpa[8], 'responseHops': dpa[10], 'timeslot': dpa[9]} 103 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, 104 pdata=pdata, result=result)
Confirmation DPA factory method.
Parses DPA confirmation message and constructs Confirmation
object.
Arguments:
- dpa (bytes): DPA confirmation bytes.
Returns:
Confirmation
: Confirmation message object.
106 @classmethod 107 def from_json(cls, json: dict) -> 'Confirmation': 108 """Confirmation JSON factory method. 109 110 This method is not implemented as JSON API does not support standalone Confirmation messages. 111 112 Raises: 113 NotImplementedError: This method is not implemented. 114 """ 115 raise NotImplementedError('from_json() method not implemented.')
Confirmation JSON factory method.
This method is not implemented as JSON API does not support standalone Confirmation messages.
Raises:
- NotImplementedError: This method is not implemented.
16class GenericRequest(IRequest): 17 """Generic request class.""" 18 19 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 20 hwpid: int = dpa_constants.HWPID_MAX, pdata: Optional[List[int]] = None, 21 dpa_rsp_time: Optional[float] = None, dev_process_time: Optional[float] = None, 22 msgid: Optional[str] = None): 23 """Generic request constructor. 24 25 Args: 26 nadr (int): Device address. 27 pnum (Union[Peripheral, int]): Peripheral number. 28 pcmd (Union[Command, int]): Peripheral command. 29 hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check). 30 pdata (List[int], optional): Request data. 31 dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None. 32 dev_process_time (float, optional): Device processing time. Defaults to None. 33 msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random 34 UUIDv4 string is generated and used. 35 """ 36 super().__init__( 37 nadr=nadr, 38 pnum=pnum, 39 pcmd=pcmd, 40 m_type=GenericMessages.RAW, 41 hwpid=hwpid, 42 pdata=pdata, 43 dpa_rsp_time=dpa_rsp_time, 44 dev_process_time=dev_process_time, 45 msgid=msgid 46 ) 47 48 def to_dpa(self, mutable: bool = False) -> Union[bytes, bytearray]: 49 """DPA request serialization method. 50 51 Args: 52 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 53 Defaults to False. 54 55 Returns: 56 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 57 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 58 """ 59 return Common.serialize_to_dpa(nadr=self._nadr, pnum=self._pnum, pcmd=self._pcmd, hwpid=self._hwpid, 60 pdata=self._pdata, mutable=mutable) 61 62 def to_json(self) -> dict: 63 """JSON API request serialization method. 64 65 Returns: 66 :obj:`dict`: JSON API request object. 67 """ 68 dpa: List[int] = [self._nadr, 0, self._pnum, self._pcmd, self._hwpid & 0xFF, (self._hwpid >> 8) & 0xFF] 69 if self._pdata is not None: 70 dpa.extend(self._pdata) 71 json: dict = { 72 'mType': self._mtype.value, 73 'data': { 74 'msgId': self._msgid, 75 'req': { 76 'rData': '.'.join([f'{x:02x}' for x in dpa]) 77 }, 78 'returnVerbose': True, 79 }, 80 } 81 if self._dpa_rsp_time is not None: 82 json['data']['timeout'] = math.ceil(self._dpa_rsp_time * 1000) 83 return json
Generic request class.
19 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 20 hwpid: int = dpa_constants.HWPID_MAX, pdata: Optional[List[int]] = None, 21 dpa_rsp_time: Optional[float] = None, dev_process_time: Optional[float] = None, 22 msgid: Optional[str] = None): 23 """Generic request constructor. 24 25 Args: 26 nadr (int): Device address. 27 pnum (Union[Peripheral, int]): Peripheral number. 28 pcmd (Union[Command, int]): Peripheral command. 29 hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check). 30 pdata (List[int], optional): Request data. 31 dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None. 32 dev_process_time (float, optional): Device processing time. Defaults to None. 33 msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random 34 UUIDv4 string is generated and used. 35 """ 36 super().__init__( 37 nadr=nadr, 38 pnum=pnum, 39 pcmd=pcmd, 40 m_type=GenericMessages.RAW, 41 hwpid=hwpid, 42 pdata=pdata, 43 dpa_rsp_time=dpa_rsp_time, 44 dev_process_time=dev_process_time, 45 msgid=msgid 46 )
Generic request constructor.
Arguments:
- nadr (int): Device address.
- pnum (Union[Peripheral, int]): Peripheral number.
- pcmd (Union[Command, int]): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535 (Ignore HWPID check).
- pdata (List[int], optional): Request data.
- dpa_rsp_time (float, optional): DPA request timeout in seconds. Defaults to None.
- dev_process_time (float, optional): Device processing time. Defaults to None.
- msgid (str, optional): JSON API message ID. Defaults to None. If the parameter is not specified, a random UUIDv4 string is generated and used.
48 def to_dpa(self, mutable: bool = False) -> Union[bytes, bytearray]: 49 """DPA request serialization method. 50 51 Args: 52 mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. 53 Defaults to False. 54 55 Returns: 56 :obj:`bytes`: Immutable byte representation of DPA request packet.\n 57 :obj:`bytearray`: Mutable byte representation of DPA request packet (if argument mutable is True). 58 """ 59 return Common.serialize_to_dpa(nadr=self._nadr, pnum=self._pnum, pcmd=self._pcmd, hwpid=self._hwpid, 60 pdata=self._pdata, mutable=mutable)
DPA request serialization method.
Arguments:
- mutable (bool, optional): Serialize into mutable byte representation of DPA request packet. Defaults to False.
Returns:
bytes
: Immutable byte representation of DPA request packet.
bytearray
: Mutable byte representation of DPA request packet (if argument mutable is True).
62 def to_json(self) -> dict: 63 """JSON API request serialization method. 64 65 Returns: 66 :obj:`dict`: JSON API request object. 67 """ 68 dpa: List[int] = [self._nadr, 0, self._pnum, self._pcmd, self._hwpid & 0xFF, (self._hwpid >> 8) & 0xFF] 69 if self._pdata is not None: 70 dpa.extend(self._pdata) 71 json: dict = { 72 'mType': self._mtype.value, 73 'data': { 74 'msgId': self._msgid, 75 'req': { 76 'rData': '.'.join([f'{x:02x}' for x in dpa]) 77 }, 78 'returnVerbose': True, 79 }, 80 } 81 if self._dpa_rsp_time is not None: 82 json['data']['timeout'] = math.ceil(self._dpa_rsp_time * 1000) 83 return json
JSON API request serialization method.
Returns:
dict
: JSON API request object.
Inherited Members
15class GenericResponse(IResponseGetterMixin): 16 """Generic response class.""" 17 18 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 19 hwpid: int = dpa_constants.HWPID_MAX, rcode: int = 0, dpa_value: int = 0, 20 pdata: Optional[List[int]] = None, msgid: Optional[str] = None): 21 """Generic response constructor. 22 23 Args: 24 nadr (int): Device address. 25 pnum (Union[Peripheral, int]): Peripheral number. 26 pcmd (pcmd: Union[Command, int]): Peripheral command. 27 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 28 rcode (int, optional): Response code. Defaults to 128. 29 dpa_value (int, optional): DPA value. Defaults to 0. 30 pdata (List[int], optional): Raw PDATA. Defaults to None. 31 msgid (str, optional): Message ID. Defaults to None. 32 """ 33 super().__init__( 34 nadr=nadr, 35 pnum=pnum, 36 pcmd=pcmd, 37 m_type=GenericMessages.RAW, 38 hwpid=hwpid, 39 rcode=rcode, 40 dpa_value=dpa_value, 41 msgid=msgid, 42 pdata=pdata, 43 result=pdata, 44 ) 45 46 @classmethod 47 def from_dpa(cls, dpa: bytes) -> 'GenericResponse': 48 """DPA response factory method. 49 50 Parses DPA data and constructs GenericResponse object. 51 52 Args: 53 dpa (bytes): DPA response bytes. 54 55 Returns: 56 GenericResponse: Response message object. 57 """ 58 DpaValidator.base_response_length(dpa=dpa) 59 nadr, pnum, pcmd, hwpid, rcode, dpa_value, pdata = Common.parse_dpa_into_members(dpa=dpa) 60 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, pdata=pdata) 61 62 @classmethod 63 def from_json(cls, json: dict) -> 'GenericResponse': 64 """JSON response factory method. 65 66 Parses JSON API response and constructs GenericResponse object. 67 68 Args: 69 json (dict): JSON API response. 70 71 Returns: 72 GenericResponse: Response message object. 73 """ 74 JsonValidator.response_received(json=json) 75 msgid = Common.msgid_from_json(json=json) 76 raw = [int(x, 16) for x in Common.generic_rdata_from_json(json=json).split('.')] 77 nadr = raw[ResponsePacketMembers.NADR] 78 pnum = raw[ResponsePacketMembers.PNUM] 79 pcmd = raw[ResponsePacketMembers.PCMD] 80 hwpid = Common.hwpid_from_dpa(raw[ResponsePacketMembers.HWPID_HI], raw[ResponsePacketMembers.HWPID_LO]) 81 rcode = raw[ResponsePacketMembers.RCODE] 82 dpa_value = raw[ResponsePacketMembers.DPA_VALUE] 83 pdata = raw[8:] if len(raw) > 8 else None 84 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, pdata=pdata, 85 msgid=msgid)
Generic response class.
18 def __init__(self, nadr: int, pnum: Union[Peripheral, int], pcmd: Union[Command, int], 19 hwpid: int = dpa_constants.HWPID_MAX, rcode: int = 0, dpa_value: int = 0, 20 pdata: Optional[List[int]] = None, msgid: Optional[str] = None): 21 """Generic response constructor. 22 23 Args: 24 nadr (int): Device address. 25 pnum (Union[Peripheral, int]): Peripheral number. 26 pcmd (pcmd: Union[Command, int]): Peripheral command. 27 hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 28 rcode (int, optional): Response code. Defaults to 128. 29 dpa_value (int, optional): DPA value. Defaults to 0. 30 pdata (List[int], optional): Raw PDATA. Defaults to None. 31 msgid (str, optional): Message ID. Defaults to None. 32 """ 33 super().__init__( 34 nadr=nadr, 35 pnum=pnum, 36 pcmd=pcmd, 37 m_type=GenericMessages.RAW, 38 hwpid=hwpid, 39 rcode=rcode, 40 dpa_value=dpa_value, 41 msgid=msgid, 42 pdata=pdata, 43 result=pdata, 44 )
Generic response constructor.
Arguments:
- nadr (int): Device address.
- pnum (Union[Peripheral, int]): Peripheral number.
- pcmd (pcmd: Union[Command, int]): Peripheral command.
- hwpid (int, optional): Hardware profile ID. Defaults to 65535, this value ignores HWPID check.
- rcode (int, optional): Response code. Defaults to 128.
- dpa_value (int, optional): DPA value. Defaults to 0.
- pdata (List[int], optional): Raw PDATA. Defaults to None.
- msgid (str, optional): Message ID. Defaults to None.
46 @classmethod 47 def from_dpa(cls, dpa: bytes) -> 'GenericResponse': 48 """DPA response factory method. 49 50 Parses DPA data and constructs GenericResponse object. 51 52 Args: 53 dpa (bytes): DPA response bytes. 54 55 Returns: 56 GenericResponse: Response message object. 57 """ 58 DpaValidator.base_response_length(dpa=dpa) 59 nadr, pnum, pcmd, hwpid, rcode, dpa_value, pdata = Common.parse_dpa_into_members(dpa=dpa) 60 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, pdata=pdata)
DPA response factory method.
Parses DPA data and constructs GenericResponse object.
Arguments:
- dpa (bytes): DPA response bytes.
Returns:
GenericResponse: Response message object.
62 @classmethod 63 def from_json(cls, json: dict) -> 'GenericResponse': 64 """JSON response factory method. 65 66 Parses JSON API response and constructs GenericResponse object. 67 68 Args: 69 json (dict): JSON API response. 70 71 Returns: 72 GenericResponse: Response message object. 73 """ 74 JsonValidator.response_received(json=json) 75 msgid = Common.msgid_from_json(json=json) 76 raw = [int(x, 16) for x in Common.generic_rdata_from_json(json=json).split('.')] 77 nadr = raw[ResponsePacketMembers.NADR] 78 pnum = raw[ResponsePacketMembers.PNUM] 79 pcmd = raw[ResponsePacketMembers.PCMD] 80 hwpid = Common.hwpid_from_dpa(raw[ResponsePacketMembers.HWPID_HI], raw[ResponsePacketMembers.HWPID_LO]) 81 rcode = raw[ResponsePacketMembers.RCODE] 82 dpa_value = raw[ResponsePacketMembers.DPA_VALUE] 83 pdata = raw[8:] if len(raw) > 8 else None 84 return cls(nadr=nadr, pnum=pnum, pcmd=pcmd, hwpid=hwpid, rcode=rcode, dpa_value=dpa_value, pdata=pdata, 85 msgid=msgid)
JSON response factory method.
Parses JSON API response and constructs GenericResponse object.
Arguments:
- json (dict): JSON API response.
Returns:
GenericResponse: Response message object.
7class BinaryOutputState: 8 """Binary Output State class.""" 9 10 __slots__ = '_index', '_state' 11 12 def __init__(self, index: int, state: int): 13 """Binary Output State constructor. 14 15 Each binary output can be configured using the :obj:`state` value as follows: 16 - 0 - sets output to OFF state 17 - 1 - sets output to ON state 18 - 2-127 - sets output to ON state for the next 2-127 minutes 19 - 128 - reserved 20 - 129-255 - sets output to ON state for the next 1-127 seconds 21 22 Args: 23 index (int): Binary output index. 24 state (int): State to set. 25 """ 26 self._validate(index=index, state=state) 27 self._index = index 28 self._state = state 29 30 def _validate(self, index: int, state: int): 31 """Validate binary output state parameters. 32 33 Args: 34 index (int): Binary output index. 35 state (int): State to set. 36 """ 37 self._validate_index(index=index) 38 self._validate_state(state=state) 39 40 @staticmethod 41 def _validate_index(index: int): 42 """Validate binary output index parameter. 43 44 Args: 45 index (int): Binary output index. 46 47 Raises: 48 RequestParameterInvalidValueError: If index is less than 0 or greater than 31. 49 """ 50 if not dpa_constants.BINOUT_INDEX_MIN <= index <= dpa_constants.BINOUT_INDEX_MAX: 51 raise RequestParameterInvalidValueError('Index value should be between 0 and 31.') 52 53 @property 54 def index(self) -> int: 55 """:obj:`int`: Binary output index. 56 57 Getter and setter. 58 """ 59 return self._index 60 61 @index.setter 62 def index(self, value: int): 63 self._validate_index(index=value) 64 self._index = value 65 66 @staticmethod 67 def _validate_state(state: int): 68 """Validate binary output state parameter. 69 70 Args: 71 state (int): State to set. 72 73 Raises: 74 RequestParameterInvalidValueError: If state is less than 0 or greater than 255. 75 """ 76 if not dpa_constants.BYTE_MIN <= state <= dpa_constants.BYTE_MAX: 77 raise RequestParameterInvalidValueError('State value should be between 0 and 255.') 78 79 @property 80 def state(self) -> int: 81 """:obj:`int`: Binary output state. 82 83 Getter and setter. 84 """ 85 return self._state 86 87 @state.setter 88 def state(self, value: int): 89 self._validate_state(state=value) 90 self._state = value 91 92 def to_json(self) -> dict: 93 """Serialize Binary Output State to JSON API param. 94 95 Returns: 96 dict: JSON-serialized Binary Output State 97 """ 98 record = { 99 'index': self.index, 100 'state': self.state > 0, 101 } 102 if 2 <= self.state <= 127: 103 record['time'] = self.state * 60 104 elif 128 <= self.state <= 255: 105 record['time'] = self.state - 128 106 return record
Binary Output State class.
12 def __init__(self, index: int, state: int): 13 """Binary Output State constructor. 14 15 Each binary output can be configured using the :obj:`state` value as follows: 16 - 0 - sets output to OFF state 17 - 1 - sets output to ON state 18 - 2-127 - sets output to ON state for the next 2-127 minutes 19 - 128 - reserved 20 - 129-255 - sets output to ON state for the next 1-127 seconds 21 22 Args: 23 index (int): Binary output index. 24 state (int): State to set. 25 """ 26 self._validate(index=index, state=state) 27 self._index = index 28 self._state = state
Binary Output State constructor.
Each binary output can be configured using the state
value as follows:
- 0 - sets output to OFF state
- 1 - sets output to ON state
- 2-127 - sets output to ON state for the next 2-127 minutes
- 128 - reserved
- 129-255 - sets output to ON state for the next 1-127 seconds
Arguments:
- index (int): Binary output index.
- state (int): State to set.
92 def to_json(self) -> dict: 93 """Serialize Binary Output State to JSON API param. 94 95 Returns: 96 dict: JSON-serialized Binary Output State 97 """ 98 record = { 99 'index': self.index, 100 'state': self.state > 0, 101 } 102 if 2 <= self.state <= 127: 103 record['time'] = self.state * 60 104 elif 128 <= self.state <= 255: 105 record['time'] = self.state - 128 106 return record
Serialize Binary Output State to JSON API param.
Returns:
dict: JSON-serialized Binary Output State
7class CoordinatorAuthorizeBondParams: 8 """AuthorizeBondParams class. 9 10 Parameters for AuthorizeBond request. Each object represents a single 11 pair of device address and MID. 12 """ 13 14 __slots__ = '_req_addr', '_mid' 15 16 def __init__(self, req_addr: int, mid: int): 17 """AuthorizeBondParams constructor. 18 19 Args: 20 req_addr (int): Device address 21 mid (int): Module ID 22 """ 23 self._validate(req_addr=req_addr, mid=mid) 24 self._req_addr = req_addr 25 self._mid = mid 26 27 def _validate(self, req_addr: int, mid: int): 28 """Validates pair parameters. 29 30 Args: 31 req_addr (int): Requested node address. 32 mid (int): Module ID. 33 """ 34 self._validate_req_addr(req_addr=req_addr) 35 self._validate_mid(mid=mid) 36 37 @staticmethod 38 def _validate_req_addr(req_addr: int): 39 """Validates requested node address. 40 41 Args: 42 req_addr (int): Requested node address. 43 44 Raises: 45 RequestParameterInvalidValueError: If req_addr is less than 0 or greater than 255. 46 """ 47 if not dpa_constants.BYTE_MIN <= req_addr <= dpa_constants.BYTE_MAX: 48 raise RequestParameterInvalidValueError('Requested address value should be between 1 and 239.') 49 50 @property 51 def req_addr(self) -> int: 52 """:obj:`int`: Requested node address. 53 54 Getter and setter. 55 """ 56 return self._req_addr 57 58 @req_addr.setter 59 def req_addr(self, value: int): 60 self._validate_req_addr(req_addr=value) 61 self._req_addr = value 62 63 @staticmethod 64 def _validate_mid(mid: int): 65 """Validates module ID. 66 67 Args: 68 mid (int): Module ID. 69 70 Raises: 71 RequestParameterInvalidValueError: If mid is less than 0 or greater than 4294967295. 72 """ 73 if not dpa_constants.MID_MIN <= mid <= dpa_constants.MID_MAX: 74 raise RequestParameterInvalidValueError('MID value should be an unsigned 32bit integer.') 75 76 @property 77 def mid(self) -> int: 78 """:obj:`int`: Module ID. 79 80 Getter and setter. 81 """ 82 return self._mid 83 84 @mid.setter 85 def mid(self, value: int): 86 self._validate_mid(mid=value) 87 self._mid = value
AuthorizeBondParams class.
Parameters for AuthorizeBond request. Each object represents a single pair of device address and MID.
16 def __init__(self, req_addr: int, mid: int): 17 """AuthorizeBondParams constructor. 18 19 Args: 20 req_addr (int): Device address 21 mid (int): Module ID 22 """ 23 self._validate(req_addr=req_addr, mid=mid) 24 self._req_addr = req_addr 25 self._mid = mid
AuthorizeBondParams constructor.
Arguments:
- req_addr (int): Device address
- mid (int): Module ID
6class CoordinatorDpaParam(IntEnum): 7 """DpaParam class. 8 9 Parameters for SetDpaParams request. 10 """ 11 LAST_RSSI = 0 12 VOLTAGE = 1 13 SYSTEM = 2 14 USER_SPECIFIED = 3
DpaParam class.
Parameters for SetDpaParams request.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
7@dataclass 8class ExplorationPerEnumData: 9 """Exploration Peripheral Enumeration Data class.""" 10 11 __slots__ = 'dpa_version', 'user_per_nr', 'embedded_pers', 'hwpid', 'hwpid_ver', 'flags', 'user_per' 12 13 def __init__(self, result: dict, os_read: bool = False): 14 """Peripheral Enumeration Data constructor. 15 16 Args: 17 result (dict): Peripheral enumeration result. 18 os_read (boolean): Data parsed from OS Read response. 19 """ 20 self.dpa_version: int = result['dpaVer'] 21 """DPA version.""" 22 self.user_per_nr: int = result['perNr'] 23 """Number of non-embedded peripherals implemented by custom DPA handler.""" 24 self.embedded_pers: List[int] = result['embeddedPers'] 25 """List of embedded peripherals implemented by device.""" 26 self.hwpid: int = result['hwpid'] 27 """Hardware profile ID.""" 28 self.hwpid_ver: int = result['hwpidVer'] 29 """Hardware profile version.""" 30 self.flags: int = result['flagsEnum' if os_read else 'flags'] 31 """Device function flags.""" 32 self.user_per: List[int] = result['userPer'] 33 """List of non-embedded peripherals implemented by device."""
Exploration Peripheral Enumeration Data class.
13 def __init__(self, result: dict, os_read: bool = False): 14 """Peripheral Enumeration Data constructor. 15 16 Args: 17 result (dict): Peripheral enumeration result. 18 os_read (boolean): Data parsed from OS Read response. 19 """ 20 self.dpa_version: int = result['dpaVer'] 21 """DPA version.""" 22 self.user_per_nr: int = result['perNr'] 23 """Number of non-embedded peripherals implemented by custom DPA handler.""" 24 self.embedded_pers: List[int] = result['embeddedPers'] 25 """List of embedded peripherals implemented by device.""" 26 self.hwpid: int = result['hwpid'] 27 """Hardware profile ID.""" 28 self.hwpid_ver: int = result['hwpidVer'] 29 """Hardware profile version.""" 30 self.flags: int = result['flagsEnum' if os_read else 'flags'] 31 """Device function flags.""" 32 self.user_per: List[int] = result['userPer'] 33 """List of non-embedded peripherals implemented by device."""
Peripheral Enumeration Data constructor.
Arguments:
- result (dict): Peripheral enumeration result.
- os_read (boolean): Data parsed from OS Read response.
8@dataclass 9class ExplorationPerInfoData: 10 """Exploration Peripheral Information Data class.""" 11 12 __slots__ = 'perte', 'pert', 'par1', 'par2' 13 14 def __init__(self, result: dict): 15 """Peripheral Information Data constructor. 16 17 Args: 18 result (dict): Peripheral enumeration result. 19 """ 20 self.perte: int = result['perTe'] 21 """Extended peripheral characteristic.""" 22 self.pert: Union[PeripheralTypes, int] = result['perT'] 23 """Peripheral type. If the peripheral is not supported or enabled, value is equal 24 to :obj:`PERIPHERAL_TYPE_DUMMY`.""" 25 self.par1 = result['par1'] 26 """Optional peripheral specific information.""" 27 self.par2 = result['par2'] 28 """Optional peripheral specific information."""
Exploration Peripheral Information Data class.
14 def __init__(self, result: dict): 15 """Peripheral Information Data constructor. 16 17 Args: 18 result (dict): Peripheral enumeration result. 19 """ 20 self.perte: int = result['perTe'] 21 """Extended peripheral characteristic.""" 22 self.pert: Union[PeripheralTypes, int] = result['perT'] 23 """Peripheral type. If the peripheral is not supported or enabled, value is equal 24 to :obj:`PERIPHERAL_TYPE_DUMMY`.""" 25 self.par1 = result['par1'] 26 """Optional peripheral specific information.""" 27 self.par2 = result['par2'] 28 """Optional peripheral specific information."""
Peripheral Information Data constructor.
Arguments:
- result (dict): Peripheral enumeration result.
Peripheral type. If the peripheral is not supported or enabled, value is equal
to PERIPHERAL_TYPE_DUMMY
.
8class IoTriplet: 9 """IO Triplet parameter class.""" 10 11 __slots__ = '_port', '_mask', '_value' 12 13 def __init__(self, port: Union[IoConstants, int], mask: int, value: int): 14 """IO Triplet constructor. 15 16 Args: 17 port (Union[IoConstants, int]): Port number. 18 mask (int): Port pin mask. 19 value (int): Value to write. 20 """ 21 self._validate(port=port, mask=mask, value=value) 22 self._port = port 23 self._mask = mask 24 self._value = value 25 26 def _validate(self, port: int, mask: int, value: int): 27 """Validate IO triplet parameters. 28 29 Args: 30 port (int): Port number. 31 mask (int): Port pin mask. 32 value (int): Value to write. 33 """ 34 self._validate_port(port=port) 35 self._validate_mask(mask=mask) 36 self._validate_value(value=value) 37 38 @staticmethod 39 def _validate_port(port: Union[IoConstants, int]): 40 """Validate port number. 41 42 Args: 43 port (Union[IoConstants, int]): Port number. 44 45 Raises: 46 RequestParameterInvalidValueError: If port is less than 0 or greater than 255. 47 """ 48 if not BYTE_MIN <= port <= BYTE_MAX: 49 raise RequestParameterInvalidValueError('Port should be between 0 and 255.') 50 51 @property 52 def port(self) -> Union[IoConstants, int]: 53 """:obj:`int` or :obj:`IoConstants`: Port number. 54 55 Getter and setter. 56 """ 57 return self._port 58 59 @port.setter 60 def port(self, val: Union[IoConstants, int]): 61 self._validate_port(port=val) 62 self._value = val 63 64 @staticmethod 65 def _validate_mask(mask: int): 66 """Validate port pin mask. 67 68 Args: 69 mask (int): Port pin mask. 70 71 Raises: 72 RequestParameterInvalidValueError: If mask is less than 0 or greater than 255. 73 """ 74 if not BYTE_MIN <= mask <= BYTE_MAX: 75 raise RequestParameterInvalidValueError('Mask should be between 0 and 255.') 76 77 @property 78 def mask(self) -> int: 79 """:obj:`int`: Port pin mask. 80 81 Getter and setter. 82 """ 83 return self._mask 84 85 @mask.setter 86 def mask(self, val: int): 87 self._validate_mask(mask=val) 88 self._mask = val 89 90 @staticmethod 91 def _validate_value(value: int): 92 """Validate value to write. 93 94 Args: 95 value (int): Value to write. 96 97 Raises: 98 RequestParameterInvalidValueError: If value is less than 0 or greater than 255. 99 """ 100 if not BYTE_MIN <= value <= BYTE_MAX: 101 raise RequestParameterInvalidValueError('Value should be between 0 and 255.') 102 103 @property 104 def value(self) -> int: 105 """:obj:`int`: Value to write. 106 107 Getter and setter. 108 """ 109 return self._value 110 111 @value.setter 112 def value(self, val: int): 113 self._validate_value(value=val) 114 self._value = val
IO Triplet parameter class.
13 def __init__(self, port: Union[IoConstants, int], mask: int, value: int): 14 """IO Triplet constructor. 15 16 Args: 17 port (Union[IoConstants, int]): Port number. 18 mask (int): Port pin mask. 19 value (int): Value to write. 20 """ 21 self._validate(port=port, mask=mask, value=value) 22 self._port = port 23 self._mask = mask 24 self._value = value
IO Triplet constructor.
Arguments:
- port (Union[IoConstants, int]): Port number.
- mask (int): Port pin mask.
- value (int): Value to write.
6@dataclass 7class NodeReadData: 8 """Node Read Data class.""" 9 10 __slots__ = 'ntw_addr', 'ntw_vrn', 'ntw_zin', 'ntw_did', 'ntw_pvrn', 'ntw_useraddr', 'ntw_id', 'ntw_vrnfnz', \ 11 'ntw_cfg', 'flags' 12 13 def __init__(self, data: dict): 14 """Node Read Data constructor. 15 16 Args: 17 data (dict): Node read result. 18 """ 19 self.ntw_addr: int = data['ntwADDR'] 20 """Logical node address.""" 21 self.ntw_vrn: int = data['ntwVRN'] 22 """Virtual routing number.""" 23 self.ntw_zin: int = data['ntwZIN'] 24 """Zone index (zone number + 1).""" 25 self.ntw_did: int = data['ntwDID'] 26 """Discovery ID.""" 27 self.ntw_pvrn: int = data['ntwPVRN'] 28 """Parent virtual routing number.""" 29 self.ntw_useraddr: int = data['ntwUSERADDRESS'] 30 """User address.""" 31 self.ntw_id: int = data['ntwID'] 32 """Network identification.""" 33 self.ntw_vrnfnz: int = data['ntwVRNFNZ'] 34 """Virtual routing number of first node in zone.""" 35 self.ntw_cfg: int = data['ntwCFG'] 36 """Network configuration.""" 37 self.flags: int = data['flags'] 38 """Node flags."""
Node Read Data class.
13 def __init__(self, data: dict): 14 """Node Read Data constructor. 15 16 Args: 17 data (dict): Node read result. 18 """ 19 self.ntw_addr: int = data['ntwADDR'] 20 """Logical node address.""" 21 self.ntw_vrn: int = data['ntwVRN'] 22 """Virtual routing number.""" 23 self.ntw_zin: int = data['ntwZIN'] 24 """Zone index (zone number + 1).""" 25 self.ntw_did: int = data['ntwDID'] 26 """Discovery ID.""" 27 self.ntw_pvrn: int = data['ntwPVRN'] 28 """Parent virtual routing number.""" 29 self.ntw_useraddr: int = data['ntwUSERADDRESS'] 30 """User address.""" 31 self.ntw_id: int = data['ntwID'] 32 """Network identification.""" 33 self.ntw_vrnfnz: int = data['ntwVRNFNZ'] 34 """Virtual routing number of first node in zone.""" 35 self.ntw_cfg: int = data['ntwCFG'] 36 """Network configuration.""" 37 self.flags: int = data['flags'] 38 """Node flags."""
Node Read Data constructor.
Arguments:
- data (dict): Node read result.
7class NodeValidateBondsParams: 8 """Node Validate Bonds parameters class.""" 9 10 __slots__ = '_bond_addr', '_mid' 11 12 def __init__(self, bond_addr: int, mid: int): 13 """Validate Bonds parameters constructor. 14 15 Args: 16 bond_addr (int): Node device address. 17 mid (int): Node module ID. 18 """ 19 self._validate(bond_addr=bond_addr, mid=mid) 20 self._bond_addr = bond_addr 21 self._mid = mid 22 23 def _validate(self, bond_addr: int, mid: int): 24 """Validate parameters. 25 26 Args: 27 bond_addr (int): Node device address. 28 mid (int): Node module ID. 29 """ 30 self._validate_bond_addr(bond_addr) 31 self._validate_mid(mid) 32 33 @staticmethod 34 def _validate_bond_addr(bond_addr: int): 35 """Validate node device address parameter. 36 37 Args: 38 bond_addr (int): Node device address. 39 40 Raises: 41 RequestParameterInvalidValueError: If bond_addr is less than 0 or greater than 255. 42 """ 43 if not dpa_constants.BYTE_MIN <= bond_addr <= dpa_constants.BYTE_MAX: 44 raise RequestParameterInvalidValueError('Bond address value should be between 0 and 255.') 45 46 @property 47 def bond_addr(self) -> int: 48 """:obj:`int`: Node device address. 49 50 Getter and setter. 51 """ 52 return self._bond_addr 53 54 @bond_addr.setter 55 def bond_addr(self, value: int): 56 self._validate_bond_addr(value) 57 self._bond_addr = value 58 59 @staticmethod 60 def _validate_mid(mid: int): 61 """Validate module ID parameter. 62 63 Args: 64 mid (int): Node module ID. 65 66 Raises: 67 RequestParameterInvalidValueError: If mid is less than 0 or greater than 4294967295. 68 """ 69 if not dpa_constants.MID_MIN <= mid <= dpa_constants.MID_MAX: 70 raise RequestParameterInvalidValueError('MID value should be an unsigned 32bit integer.') 71 72 @property 73 def mid(self): 74 """:obj:`int`: Module ID. 75 76 Getter and setter. 77 """ 78 return self._mid 79 80 @mid.setter 81 def mid(self, value): 82 self._validate_mid(value) 83 self._mid = value
Node Validate Bonds parameters class.
12 def __init__(self, bond_addr: int, mid: int): 13 """Validate Bonds parameters constructor. 14 15 Args: 16 bond_addr (int): Node device address. 17 mid (int): Node module ID. 18 """ 19 self._validate(bond_addr=bond_addr, mid=mid) 20 self._bond_addr = bond_addr 21 self._mid = mid
Validate Bonds parameters constructor.
Arguments:
- bond_addr (int): Node device address.
- mid (int): Node module ID.
11class OsBatchData: 12 """Batch Data class.""" 13 14 __slots__ = '_pnum', '_pcmd', '_hwpid', '_pdata' 15 16 def __init__(self, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int = dpa_constants.HWPID_MAX, 17 pdata: Optional[List[int]] = None): 18 """Batch Data constructor. 19 20 Args: 21 pnum (Union[Peripheral, int]): Peripheral number. 22 pcmd (Union[Command, int]): Peripheral command. 23 hwpid (int): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 24 pdata (List[int], optional): Request data. 25 """ 26 self._validate(pnum=pnum, pcmd=pcmd, hwpid=hwpid, pdata=pdata) 27 self._pnum = pnum 28 self._pcmd = pcmd 29 self._hwpid = hwpid 30 self._pdata = pdata if pdata is not None else [] 31 32 def _validate(self, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int, 33 pdata: Optional[List[int]] = None): 34 """Validate batch parameters. 35 36 Args: 37 pnum (Union[Peripheral, int]): Peripheral number. 38 pcmd (Union[Command, int]): Peripheral command. 39 hwpid (int): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 40 pdata (List[int], optional): Request data. 41 """ 42 self._validate_pnum(pnum) 43 self._validate_pcmd(pcmd) 44 self._validate_hwpid(hwpid) 45 self._validate_pdata(pdata) 46 47 @staticmethod 48 def _validate_pnum(pnum: Union[Peripheral, int]): 49 """Validate peripheral number parameter. 50 51 Args: 52 pnum (Union[Peripheral, int]): Peripheral number. 53 54 Raises: 55 RequestParameterInvalidValueError: If pnum is less than 0 or greater than 255. 56 """ 57 if not dpa_constants.BYTE_MIN <= pnum <= dpa_constants.BYTE_MAX: 58 raise RequestParameterInvalidValueError('PNUM value should be between 0 and 255.') 59 60 @property 61 def pnum(self) -> Union[Peripheral, int]: 62 """:obj:`Peripheral` or :obj:`int`: Peripheral number. 63 64 Getter and setter. 65 """ 66 return self._pnum 67 68 @pnum.setter 69 def pnum(self, value: Union[Peripheral, int]): 70 self._validate_pnum(value) 71 self._pnum = value 72 73 @staticmethod 74 def _validate_pcmd(pcmd: Union[Command, int]): 75 """Validate peripheral command parameter. 76 77 Args: 78 pcmd (Union[Command, int]): Peripheral command. 79 80 Raises: 81 RequestParameterInvalidValueError: If pcmd is less than 0 or greater than 255. 82 """ 83 if not dpa_constants.BYTE_MIN <= pcmd <= dpa_constants.BYTE_MAX: 84 raise RequestParameterInvalidValueError('PCMD value should be between 0 and 255.') 85 86 @property 87 def pcmd(self) -> Union[Command, int]: 88 """:obj:`Command` or :obj:`int`: Peripheral command. 89 90 Getter and setter. 91 """ 92 return self._pcmd 93 94 @pcmd.setter 95 def pcmd(self, value: Union[Command, int]): 96 self._validate_pcmd(value) 97 self._pcmd = value 98 99 @staticmethod 100 def _validate_hwpid(hwpid: int): 101 """Validate hardware profile ID parameter. 102 103 Args: 104 hwpid (int): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 105 106 Raises: 107 RequestParameterInvalidValueError: If hwpid is less than 0 or greater than 65535. 108 """ 109 if not dpa_constants.HWPID_MIN <= hwpid <= dpa_constants.HWPID_MAX: 110 raise RequestParameterInvalidValueError('HWPID value should be between 0 and 65535.') 111 112 @property 113 def hwpid(self) -> int: 114 """:obj:`int`: Hardware profile ID. 115 116 Getter and setter. 117 """ 118 return self._hwpid 119 120 @hwpid.setter 121 def hwpid(self, value: int): 122 self._validate_hwpid(value) 123 self._hwpid = value 124 125 @staticmethod 126 def _validate_pdata(pdata: Optional[List[int]] = None): 127 """Validate pdata parameter. 128 129 Args: 130 pdata (List[int], optional): Request data. 131 """ 132 if pdata is None: 133 return 134 if not Common.values_in_byte_range(pdata): 135 raise RequestParameterInvalidValueError('PDATA values should be between 0 and 255.') 136 137 @property 138 def pdata(self) -> Optional[List[int]]: 139 """:obj:`list` of :obj:`int` or :obj:`None`: Request data. 140 141 Getter and setter. 142 """ 143 return self._pdata 144 145 @pdata.setter 146 def pdata(self, value: Optional[List[int]] = None): 147 self._validate_pdata(value) 148 self._pdata = value if value is not None else [] 149 150 def to_pdata(self) -> List[int]: 151 """Serialize batch data into DPA request pdata. 152 153 Returns: 154 :obj:`list` of :obj:`int`: Serialized DPA request pdata. 155 """ 156 data = [self._pnum, self._pcmd, self._hwpid & 0xFF, (self._hwpid >> 8) & 0xFF] + self._pdata 157 return [len(data) + 1] + data 158 159 def to_json(self) -> dict: 160 """Serialize batch data into JSON API request data. 161 162 Returns: 163 :obj:`dict`: Serialized JSON API request data. 164 """ 165 data = { 166 'pnum': f'{self._pnum:02x}', 167 'pcmd': f'{self._pcmd:02x}', 168 'hwpid': f'{self._hwpid:04x}', 169 } 170 if self._pdata is not None and len(self._pdata) > 0: 171 data['rdata'] = '.'.join([f'{x:02x}' for x in self._pdata]) 172 return data
Batch Data class.
16 def __init__(self, pnum: Union[Peripheral, int], pcmd: Union[Command, int], hwpid: int = dpa_constants.HWPID_MAX, 17 pdata: Optional[List[int]] = None): 18 """Batch Data constructor. 19 20 Args: 21 pnum (Union[Peripheral, int]): Peripheral number. 22 pcmd (Union[Command, int]): Peripheral command. 23 hwpid (int): Hardware profile ID. Defaults to 65535, this value ignores HWPID check. 24 pdata (List[int], optional): Request data. 25 """ 26 self._validate(pnum=pnum, pcmd=pcmd, hwpid=hwpid, pdata=pdata) 27 self._pnum = pnum 28 self._pcmd = pcmd 29 self._hwpid = hwpid 30 self._pdata = pdata if pdata is not None else []
Batch Data constructor.
Arguments:
- pnum (Union[Peripheral, int]): Peripheral number.
- pcmd (Union[Command, int]): Peripheral command.
- hwpid (int): Hardware profile ID. Defaults to 65535, this value ignores HWPID check.
- pdata (List[int], optional): Request data.
Peripheral
or int
: Peripheral number.
Getter and setter.
Command
or int
: Peripheral command.
Getter and setter.
150 def to_pdata(self) -> List[int]: 151 """Serialize batch data into DPA request pdata. 152 153 Returns: 154 :obj:`list` of :obj:`int`: Serialized DPA request pdata. 155 """ 156 data = [self._pnum, self._pcmd, self._hwpid & 0xFF, (self._hwpid >> 8) & 0xFF] + self._pdata 157 return [len(data) + 1] + data
Serialize batch data into DPA request pdata.
Returns:
list
ofint
: Serialized DPA request pdata.
159 def to_json(self) -> dict: 160 """Serialize batch data into JSON API request data. 161 162 Returns: 163 :obj:`dict`: Serialized JSON API request data. 164 """ 165 data = { 166 'pnum': f'{self._pnum:02x}', 167 'pcmd': f'{self._pcmd:02x}', 168 'hwpid': f'{self._hwpid:04x}', 169 } 170 if self._pdata is not None and len(self._pdata) > 0: 171 data['rdata'] = '.'.join([f'{x:02x}' for x in self._pdata]) 172 return data
Serialize batch data into JSON API request data.
Returns:
dict
: Serialized JSON API request data.
6class OsIndicateParam(IntEnumMember): 7 """OS Indicate control params enum.""" 8 9 OFF = 0 10 ON = 1 11 INDICATE_1S = 2 12 INDICATE_10S = 3
OS Indicate control params enum.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
6class OsLoadCodeFlags: 7 """OS Load Code flags class.""" 8 __slots__ = '_action', '_code_type' 9 10 def __init__(self, action: OsLoadCodeAction, code_type: OsLoadCodeType): 11 """Load Code flags constructor. 12 13 Args: 14 action (OsLoadCodeAction): Load code action. 15 code_type (OsLoadCodeType): Load code code type. 16 """ 17 self._action = action 18 self._code_type = code_type 19 20 @property 21 def action(self) -> OsLoadCodeAction: 22 """:obj:`OsLoadCodeAction`: Load Code action. 23 24 Getter and setter. 25 """ 26 return self._action 27 28 @action.setter 29 def action(self, value: OsLoadCodeAction): 30 self._action = value 31 32 @property 33 def code_type(self) -> OsLoadCodeType: 34 """:obj:`OsLoadCodeType`: Load Code code type. 35 36 Getter and setter. 37 """ 38 return self._code_type 39 40 @code_type.setter 41 def code_type(self, value: OsLoadCodeType): 42 self._code_type = value 43 44 def serialize(self) -> int: 45 """Serialize flags into an integer value. 46 47 Returns: 48 :obj:`int`: Flags value. 49 """ 50 return self._action | (self._code_type << 1)
OS Load Code flags class.
10 def __init__(self, action: OsLoadCodeAction, code_type: OsLoadCodeType): 11 """Load Code flags constructor. 12 13 Args: 14 action (OsLoadCodeAction): Load code action. 15 code_type (OsLoadCodeType): Load code code type. 16 """ 17 self._action = action 18 self._code_type = code_type
Load Code flags constructor.
Arguments:
- action (OsLoadCodeAction): Load code action.
- code_type (OsLoadCodeType): Load code code type.
8@dataclass 9class OsReadData: 10 """OS Read Data class.""" 11 12 __slots__ = 'mid', 'os_version', 'tr_mcu_type', 'os_build', 'rssi', 'supply_voltage', 'flags', 'slot_limits', 'ibk', \ 13 'per_enum' 14 15 def __init__(self, data: dict): 16 """Read Data constructor. 17 18 Args: 19 data (dict): OS Read data. 20 """ 21 self.mid: int = data['mid'] 22 """Module ID.""" 23 self.os_version: int = data['osVersion'] 24 """OS version.""" 25 self.tr_mcu_type: TrMcuTypeData = TrMcuTypeData(data['trMcuType']) 26 """MCU type and TR series.""" 27 self.os_build: int = data['osBuild'] 28 """OS build.""" 29 self.rssi: int = data['rssi'] 30 """RSSI.""" 31 self.supply_voltage: int = data['supplyVoltage'] 32 """Supply voltage.""" 33 self.flags: int = data['flags'] 34 """OS flags.""" 35 self.slot_limits: int = data['slotLimits'] 36 """Slot limits.""" 37 self.ibk: int = data['ibk'] 38 """Individual bonding key.""" 39 self.per_enum: ExplorationPerEnumData = ExplorationPerEnumData(data, os_read=True) 40 """Peripheral enumeration data."""
OS Read Data class.
15 def __init__(self, data: dict): 16 """Read Data constructor. 17 18 Args: 19 data (dict): OS Read data. 20 """ 21 self.mid: int = data['mid'] 22 """Module ID.""" 23 self.os_version: int = data['osVersion'] 24 """OS version.""" 25 self.tr_mcu_type: TrMcuTypeData = TrMcuTypeData(data['trMcuType']) 26 """MCU type and TR series.""" 27 self.os_build: int = data['osBuild'] 28 """OS build.""" 29 self.rssi: int = data['rssi'] 30 """RSSI.""" 31 self.supply_voltage: int = data['supplyVoltage'] 32 """Supply voltage.""" 33 self.flags: int = data['flags'] 34 """OS flags.""" 35 self.slot_limits: int = data['slotLimits'] 36 """Slot limits.""" 37 self.ibk: int = data['ibk'] 38 """Individual bonding key.""" 39 self.per_enum: ExplorationPerEnumData = ExplorationPerEnumData(data, os_read=True) 40 """Peripheral enumeration data."""
Read Data constructor.
Arguments:
- data (dict): OS Read data.
6class OsSecurityTypeParam(IntEnumMember): 7 """OS Security type enum.""" 8 9 ACCESS_PASSWORD = 0 10 USER_KEY = 1
OS Security type enum.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
8class OsSleepParams: 9 """OS Sleep Params class.""" 10 11 __slots__ = '_time', 'wake_up_on_negative_edge', 'calibrate_before_sleep', 'flash_led_after_sleep', \ 12 'wake_up_on_positive_edge', 'use_milliseconds', 'use_deep_sleep' 13 14 def __init__(self, time: int = 0, wake_up_on_negative_edge: bool = False, calibrate_before_sleep: bool = False, 15 flash_led_after_sleep: bool = False, wake_up_on_positive_edge: bool = False, 16 use_milliseconds: bool = False, use_deep_sleep: bool = False): 17 """Sleep Params constructor. 18 19 Args: 20 time (int): Sleep time. 21 wake_up_on_negative_edge (bool): Wake up on PORT B, bit 4 negative edge change. 22 calibrate_before_sleep (bool): Run calibration process before sleep. 23 flash_led_after_sleep (bool): Flash green LED once after waking up. 24 wake_up_on_positive_edge (bool): Wake up on PORT B, bit 4 positive edge change. 25 use_milliseconds (bool): Use sleep time with unit of 32.768 ms instead of 2.097s. 26 use_deep_sleep (bool): The IC is shutdown during sleep. 27 """ 28 self._validate_time(time) 29 self._time = time 30 self.wake_up_on_negative_edge = wake_up_on_negative_edge 31 """Wake up on PORT B, bit 4 negative edge change.""" 32 self.calibrate_before_sleep = calibrate_before_sleep 33 """Run calibration process before sleep.""" 34 self.flash_led_after_sleep = flash_led_after_sleep 35 """Flash green LED once after waking up.""" 36 self.wake_up_on_positive_edge = wake_up_on_positive_edge 37 """Wake up on PORT B, bit 4 positive edge change.""" 38 self.use_milliseconds = use_milliseconds 39 """Use sleep time with unit of 32.768 ms instead of 2.097s.""" 40 self.use_deep_sleep = use_deep_sleep 41 """The IC is shutdown during sleep.""" 42 43 @staticmethod 44 def _validate_time(time: int): 45 """Validate sleep time parameter. 46 47 Args: 48 time (int): Sleep time. 49 50 Raises: 51 RequestParameterInvalidValueError: If time is less than 0 or greater than 255. 52 """ 53 if not dpa_constants.WORD_MIN <= time <= dpa_constants.WORD_MAX: 54 raise RequestParameterInvalidValueError('Time value should be between 0 and 65535.') 55 56 @property 57 def time(self) -> int: 58 """:obj:`int`: Sleep time. 59 60 Getter and setter. 61 """ 62 return self._time 63 64 @time.setter 65 def time(self, value: int): 66 self._validate_time(value) 67 self._time = value 68 69 def _calculate_control(self) -> int: 70 """Convert flags into a single value. 71 72 Returns: 73 :obj:`int`: Flags value. 74 """ 75 return self.wake_up_on_negative_edge | (self.calibrate_before_sleep << 1) | \ 76 (self.flash_led_after_sleep << 2) | (self.wake_up_on_positive_edge << 3) | \ 77 (self.use_milliseconds << 4) | (self.use_deep_sleep << 5) 78 79 def to_pdata(self) -> List[int]: 80 """Serialize sleep parameters into DPA request pdata. 81 82 Returns: 83 :obj:`list` of :obj:`int`: Serialized DPA request pdata. 84 """ 85 return [self._time & 0xFF, (self.time >> 8) & 0xFF, self._calculate_control()] 86 87 def to_json(self) -> dict: 88 """Serializes sleep parameters into JSON API request data. 89 90 Returns: 91 :obj:`dict`: Serialized JSON API request data. 92 """ 93 return { 94 'time': self._time, 95 'control': self._calculate_control() 96 }
OS Sleep Params class.
14 def __init__(self, time: int = 0, wake_up_on_negative_edge: bool = False, calibrate_before_sleep: bool = False, 15 flash_led_after_sleep: bool = False, wake_up_on_positive_edge: bool = False, 16 use_milliseconds: bool = False, use_deep_sleep: bool = False): 17 """Sleep Params constructor. 18 19 Args: 20 time (int): Sleep time. 21 wake_up_on_negative_edge (bool): Wake up on PORT B, bit 4 negative edge change. 22 calibrate_before_sleep (bool): Run calibration process before sleep. 23 flash_led_after_sleep (bool): Flash green LED once after waking up. 24 wake_up_on_positive_edge (bool): Wake up on PORT B, bit 4 positive edge change. 25 use_milliseconds (bool): Use sleep time with unit of 32.768 ms instead of 2.097s. 26 use_deep_sleep (bool): The IC is shutdown during sleep. 27 """ 28 self._validate_time(time) 29 self._time = time 30 self.wake_up_on_negative_edge = wake_up_on_negative_edge 31 """Wake up on PORT B, bit 4 negative edge change.""" 32 self.calibrate_before_sleep = calibrate_before_sleep 33 """Run calibration process before sleep.""" 34 self.flash_led_after_sleep = flash_led_after_sleep 35 """Flash green LED once after waking up.""" 36 self.wake_up_on_positive_edge = wake_up_on_positive_edge 37 """Wake up on PORT B, bit 4 positive edge change.""" 38 self.use_milliseconds = use_milliseconds 39 """Use sleep time with unit of 32.768 ms instead of 2.097s.""" 40 self.use_deep_sleep = use_deep_sleep 41 """The IC is shutdown during sleep."""
Sleep Params constructor.
Arguments:
- time (int): Sleep time.
- wake_up_on_negative_edge (bool): Wake up on PORT B, bit 4 negative edge change.
- calibrate_before_sleep (bool): Run calibration process before sleep.
- flash_led_after_sleep (bool): Flash green LED once after waking up.
- wake_up_on_positive_edge (bool): Wake up on PORT B, bit 4 positive edge change.
- use_milliseconds (bool): Use sleep time with unit of 32.768 ms instead of 2.097s.
- use_deep_sleep (bool): The IC is shutdown during sleep.
79 def to_pdata(self) -> List[int]: 80 """Serialize sleep parameters into DPA request pdata. 81 82 Returns: 83 :obj:`list` of :obj:`int`: Serialized DPA request pdata. 84 """ 85 return [self._time & 0xFF, (self.time >> 8) & 0xFF, self._calculate_control()]
Serialize sleep parameters into DPA request pdata.
Returns:
list
ofint
: Serialized DPA request pdata.
87 def to_json(self) -> dict: 88 """Serializes sleep parameters into JSON API request data. 89 90 Returns: 91 :obj:`dict`: Serialized JSON API request data. 92 """ 93 return { 94 'time': self._time, 95 'control': self._calculate_control() 96 }
Serializes sleep parameters into JSON API request data.
Returns:
dict
: Serialized JSON API request data.
8class OsTrConfByte: 9 """TR Configuration Byte class.""" 10 11 __slots__ = '_address', '_value', '_mask' 12 13 def __init__(self, address: int, value: int, mask: int): 14 """TR Configuration Byte constructor. 15 16 Args: 17 address (int): Configuration memory block address. 18 value (int): Value to set. 19 mask (int): Bits of configuration byte to modify. 20 """ 21 self._validate(address=address, value=value, mask=mask) 22 self._address = address 23 self._value = value 24 self._mask = mask 25 26 def __eq__(self, other: 'OsTrConfByte'): 27 """Rich comparison method, comparing this and another object to determine equality based on properties. 28 29 Args: 30 other (OsTrConfByte): Object to compare with. 31 32 Returns: 33 bool: True if the objects are equivalent, False otherwise. 34 """ 35 return self.address == other.address and \ 36 self.value == other.value and \ 37 self.mask == other.mask 38 39 def _validate(self, address: int, value: int, mask: int): 40 """Validate TR Configuration Byte parameters. 41 42 Args: 43 address (int): Configuration memory block address. 44 value (int): Value to set. 45 mask (int): Bits of configuration byte to modify. 46 """ 47 self._validate_address(address=address) 48 self._validate_value(value=value) 49 self._validate_mask(mask=mask) 50 51 @staticmethod 52 def _validate_address(address: int): 53 """Validate address parameter. 54 55 Args: 56 address (int): Configuration memory block address. 57 58 Raises: 59 RequestParameterInvalidValueError: If address is less than 0 or greater than 255. 60 """ 61 if not dpa_constants.BYTE_MIN <= address <= dpa_constants.BYTE_MAX: 62 raise RequestParameterInvalidValueError('Address value should be between 0 and 255.') 63 64 @property 65 def address(self) -> int: 66 """:obj:`int`: Configuration memory block address. 67 68 Getter and setter. 69 """ 70 return self._address 71 72 @address.setter 73 def address(self, value: int): 74 self._validate_address(address=value) 75 self._address = value 76 77 @staticmethod 78 def _validate_value(value: int): 79 """Validate value parameter. 80 81 Args: 82 value (int): Value to set. 83 84 Raises: 85 RequestParameterInvalidValueError: If value is less than 0 or greater than 255. 86 """ 87 if not dpa_constants.BYTE_MIN <= value <= dpa_constants.BYTE_MAX: 88 raise RequestParameterInvalidValueError('Value should be between 0 and 255.') 89 90 @property 91 def value(self) -> int: 92 """:obj:`int`: Value to set. 93 94 Getter and setter. 95 """ 96 return self._value 97 98 @value.setter 99 def value(self, val: int): 100 self._validate_value(value=val) 101 self._value = val 102 103 @staticmethod 104 def _validate_mask(mask: int): 105 """Validate mask parameter. 106 107 Args: 108 mask (int): Bits of configuration byte to modify. 109 110 Raises: 111 RequestParameterInvalidValueError: If mask is less than 0 or greater than 255. 112 """ 113 if not dpa_constants.BYTE_MIN <= mask <= dpa_constants.BYTE_MAX: 114 raise RequestParameterInvalidValueError('Mask value should be between 0 and 255.') 115 116 @property 117 def mask(self) -> int: 118 """:obj:`int`: Bits of configuration byte to modify. 119 120 Getter and setter. 121 """ 122 return self._mask 123 124 @mask.setter 125 def mask(self, value: int): 126 self._validate_mask(mask=value) 127 self._mask = value 128 129 def to_pdata(self) -> List[int]: 130 """Serialize TR configuration byte members into pdata. 131 132 Returns: 133 :obj:`list` of :obj:`int`: Serialized TR configuration byte pdata. 134 """ 135 return [self._address, self._value, self._mask] 136 137 def to_json(self) -> dict: 138 """Serialize TR configuration byte members into JSON. 139 140 Returns: 141 :obj:`dict`: Serialized TR configuration byte JSON request data. 142 """ 143 return { 144 'address': self._address, 145 'value': self._value, 146 'mask': self._mask 147 }
TR Configuration Byte class.
13 def __init__(self, address: int, value: int, mask: int): 14 """TR Configuration Byte constructor. 15 16 Args: 17 address (int): Configuration memory block address. 18 value (int): Value to set. 19 mask (int): Bits of configuration byte to modify. 20 """ 21 self._validate(address=address, value=value, mask=mask) 22 self._address = address 23 self._value = value 24 self._mask = mask
TR Configuration Byte constructor.
Arguments:
- address (int): Configuration memory block address.
- value (int): Value to set.
- mask (int): Bits of configuration byte to modify.
129 def to_pdata(self) -> List[int]: 130 """Serialize TR configuration byte members into pdata. 131 132 Returns: 133 :obj:`list` of :obj:`int`: Serialized TR configuration byte pdata. 134 """ 135 return [self._address, self._value, self._mask]
Serialize TR configuration byte members into pdata.
Returns:
list
ofint
: Serialized TR configuration byte pdata.
137 def to_json(self) -> dict: 138 """Serialize TR configuration byte members into JSON. 139 140 Returns: 141 :obj:`dict`: Serialized TR configuration byte JSON request data. 142 """ 143 return { 144 'address': self._address, 145 'value': self._value, 146 'mask': self._mask 147 }
Serialize TR configuration byte members into JSON.
Returns:
dict
: Serialized TR configuration byte JSON request data.
12class OsTrConfData: 13 """OS TR Configuration data.""" 14 15 __slots__ = '_embedded_peripherals', '_custom_dpa_handler', '_dpa_peer_to_peer', '_routing_off', '_io_setup', \ 16 '_user_peer_to_peer', '_stay_awake_when_not_bonded', '_std_and_lp_network', '_rf_output_power', \ 17 '_rf_signal_filter', '_lp_rf_timeout', '_uart_baud_rate', '_alternative_dsm_channel', '_local_frc', \ 18 '_rf_channel_a', '_rf_channel_b', '_reserved_block_0', '_reserved_block_1', '_reserved_block_2' 19 20 def __init__(self, embedded_peripherals: Optional[List[Union[EmbedPeripherals, int]]] = None, 21 custom_dpa_handler: bool = False, dpa_peer_to_peer: bool = False, routing_off: bool = False, 22 io_setup: bool = False, user_peer_to_peer: bool = False, stay_awake_when_not_bonded: bool = False, 23 std_and_lp_network: bool = False, rf_output_power: int = 7, rf_signal_filter: int = 5, 24 lp_rf_timeout: int = 6, 25 uart_baud_rate: Union[dpa_constants.BaudRates, int] = dpa_constants.BaudRates.B9600, 26 alternative_dsm_channel: int = 0, local_frc: bool = False, rf_channel_a: int = 52, 27 rf_channel_b: int = 2, reserved_block_0: Optional[List[int]] = None, 28 reserved_block_1: Optional[List[int]] = None, reserved_block_2: Optional[List[int]] = None): 29 """TR Configuration data constructor. 30 31 Args: 32 embedded_peripherals (List[int]): Enabled embedded peripherals. 33 custom_dpa_handler (bool): Custom DPA handler in use. 34 dpa_peer_to_peer (bool): DPA peer-to-peer enabled. 35 routing_off (bool): Node does not route packets in background. 36 io_setup (bool): Run IO Setup early during module boot time. 37 user_peer_to_peer (bool): Receive peer-to-peer packets and raise PeerToPeer event. 38 stay_awake_when_not_bonded (bool): Stay awake during bonding process. 39 std_and_lp_network (bool): Control STD+LP network. 40 rf_output_power (int): RF output power. 41 rf_signal_filter (int): RF signal filter. 42 lp_rf_timeout (int): LP RF timeout. 43 uart_baud_rate (Union[dpa_constants.BaudRates, int]): UART baud rate. 44 alternative_dsm_channel (int): Alternative DSM channel. 45 local_frc (bool): Local FRC reception enabled. 46 rf_channel_a (int): RF Channel A. 47 rf_channel_b (int): RF Channel B. 48 reserved_block_0 (List[int], optional): Reserved data block. 49 reserved_block_1 (List[int], optional): Reserved data block. 50 reserved_block_2 (List[int], optional): Reserved data block. 51 """ 52 if embedded_peripherals is None: 53 embedded_peripherals = [] 54 if reserved_block_0 is None: 55 reserved_block_0 = [0] * 2 56 if reserved_block_1 is None: 57 reserved_block_1 = [0] * 3 58 if reserved_block_2 is None: 59 reserved_block_2 = [0] * 13 60 self._validate(embedded_peripherals=embedded_peripherals, rf_output_power=rf_output_power, 61 rf_signal_filter=rf_signal_filter, lp_rf_timeout=lp_rf_timeout, baud_rate=uart_baud_rate, 62 alternative_dsm=alternative_dsm_channel, rf_channel_a=rf_channel_a, rf_channel_b=rf_channel_b, 63 reserved_block_0=reserved_block_0, reserved_block_1=reserved_block_1, 64 reserved_block_2=reserved_block_2) 65 self._embedded_peripherals = embedded_peripherals 66 self._custom_dpa_handler = custom_dpa_handler 67 self._dpa_peer_to_peer = dpa_peer_to_peer 68 self._routing_off = routing_off 69 self._io_setup = io_setup 70 self._user_peer_to_peer = user_peer_to_peer 71 self._stay_awake_when_not_bonded = stay_awake_when_not_bonded 72 self._std_and_lp_network = std_and_lp_network 73 self._rf_output_power = rf_output_power 74 self._rf_signal_filter = rf_signal_filter 75 self._lp_rf_timeout = lp_rf_timeout 76 self._uart_baud_rate = uart_baud_rate 77 self._alternative_dsm_channel = alternative_dsm_channel 78 self._local_frc = local_frc 79 self._rf_channel_a = rf_channel_a 80 self._rf_channel_b = rf_channel_b 81 self._reserved_block_0 = reserved_block_0 82 self._reserved_block_1 = reserved_block_1 83 self._reserved_block_2 = reserved_block_2 84 85 def __eq__(self, other: 'OsTrConfData'): 86 """Rich comparison method, comparing this and another object to determine equality based on properties. 87 88 Args: 89 other (OsTrConfData): Object to compare with. 90 91 Returns: 92 bool: True if the objects are equivalent, False otherwise. 93 """ 94 return self._embedded_peripherals == other._embedded_peripherals and \ 95 self._custom_dpa_handler == other._custom_dpa_handler and \ 96 self._dpa_peer_to_peer == other._dpa_peer_to_peer and \ 97 self._routing_off == other.routing_off and \ 98 self._io_setup == other._io_setup and \ 99 self._user_peer_to_peer == other._user_peer_to_peer and \ 100 self._stay_awake_when_not_bonded == other._stay_awake_when_not_bonded and \ 101 self._std_and_lp_network == other._std_and_lp_network and \ 102 self._rf_output_power == other._rf_output_power and \ 103 self._rf_signal_filter == other._rf_signal_filter and \ 104 self._lp_rf_timeout == other._lp_rf_timeout and \ 105 self._uart_baud_rate == other._uart_baud_rate and \ 106 self._alternative_dsm_channel == other._alternative_dsm_channel and \ 107 self._local_frc == other._local_frc and \ 108 self._rf_channel_a == other._rf_channel_a and \ 109 self._rf_channel_b == other._rf_channel_b and \ 110 self._reserved_block_0 == other._reserved_block_0 and \ 111 self._reserved_block_1 == other._reserved_block_1 and \ 112 self._reserved_block_2 == other._reserved_block_2 113 114 def _validate(self, embedded_peripherals: List[int], rf_output_power: int, rf_signal_filter: int, 115 lp_rf_timeout: int, baud_rate: Union[dpa_constants.BaudRates, int], alternative_dsm: int, 116 rf_channel_a: int, rf_channel_b: int, reserved_block_0: List[int], 117 reserved_block_1: List[int], reserved_block_2: List[int]): 118 self._validate_embedded_peripherals(embedded_peripherals) 119 self._validate_rf_output_power(rf_output_power) 120 self._validate_rf_signal_filter(rf_signal_filter) 121 self._validate_lp_rf_timeout(lp_rf_timeout) 122 self._validate_uart_baud_rate(baud_rate) 123 self._validate_alternative_dsm_channel(alternative_dsm) 124 self._validate_rf_channel_a(rf_channel_a) 125 self._validate_rf_channel_b(rf_channel_b) 126 self._validate_reserved_block_0(reserved_block_0) 127 self._validate_reserved_block_1(reserved_block_1) 128 self._validate_reserved_block_2(reserved_block_2) 129 130 @staticmethod 131 def _validate_embedded_peripherals(embedded_peripherals: List[Union[EmbedPeripherals, int]]) -> None: 132 if len(embedded_peripherals) > 32: 133 raise RequestParameterInvalidValueError('Embedded peripherals should be at most 32 values.') 134 if min(embedded_peripherals, default=0) < 0 or max(embedded_peripherals, default=0) > 31: 135 raise RequestParameterInvalidValueError('Embedded peripherals values should be between 0 and 31.') 136 137 @property 138 def embedded_peripherals(self) -> List[Union[EmbedPeripherals, int]]: 139 """:obj:`list` of :obj:`EmbedPeripherals` or :obj:`int`: Enabled embedded peripherals. 140 141 Getter and setter. 142 """ 143 return self._embedded_peripherals 144 145 @embedded_peripherals.setter 146 def embedded_peripherals(self, value: List[Union[EmbedPeripherals, int]]): 147 self._validate_embedded_peripherals(embedded_peripherals=value) 148 self._embedded_peripherals = value 149 150 def enable_embedded_peripheral(self, peripheral: Union[EmbedPeripherals, int]) -> None: 151 """Enables embedded peripheral. 152 153 Args: 154 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 155 156 Raises: 157 ValueError: If peripheral value is less than 0 or greater than 31. 158 """ 159 if not (0 <= peripheral <= 31): 160 raise ValueError('Peripheral value should be between 0 and 31') 161 if peripheral not in self._embedded_peripherals: 162 self._embedded_peripherals.append(peripheral) 163 164 def disable_embedded_peripheral(self, peripheral: Union[EmbedPeripherals, int]) -> None: 165 """Disables embedded peripheral. 166 167 Args: 168 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 169 170 Raises: 171 ValueError: If peripheral value is less than 0 or greater than 31. 172 """ 173 if not (0 <= peripheral <= 31): 174 raise ValueError('Peripheral value should be between 0 and 31') 175 if peripheral in self._embedded_peripherals: 176 self._embedded_peripherals.remove(peripheral) 177 178 def get_embedded_peripheral_byte(self, peripheral: Union[EmbedPeripherals, int]) -> OsTrConfByte: 179 """Returns embedded peripheral configuration byte. 180 181 Args: 182 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 183 184 Raises: 185 ValueError: If peripheral value is less than 0 or greater than 31. 186 187 Returns: 188 :obj:`OsTrConfByte`: Embedded peripheral configuration byte. 189 """ 190 if not (0 <= peripheral <= 31): 191 raise ValueError('Peripheral value should be between 0 and 31') 192 if 0 <= peripheral <= 7: 193 address = 1 194 elif 8 <= peripheral <= 15: 195 address = 2 196 elif 16 <= peripheral <= 23: 197 address = 3 198 else: 199 address = 4 200 value = 1 << (peripheral % 8) if peripheral in self._embedded_peripherals else 0 201 return OsTrConfByte( 202 address=address, 203 value=value, 204 mask=value 205 ) 206 207 @property 208 def custom_dpa_handler(self) -> bool: 209 """:obj:`bool`: Custom DPA handler in use. 210 211 Getter and setter. 212 """ 213 return self._custom_dpa_handler 214 215 @custom_dpa_handler.setter 216 def custom_dpa_handler(self, value: bool): 217 self._custom_dpa_handler = value 218 219 def get_custom_dpa_handler_byte(self) -> OsTrConfByte: 220 """Returns custom DPA handler configuration byte. 221 222 Returns: 223 :obj:`OsTrConfByte`: Custom DPA handler configuration byte. 224 """ 225 return OsTrConfByte( 226 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 227 value=1 if self._custom_dpa_handler else 0, 228 mask=TrConfBitMasks.CUSTOM_DPA_HANDLER 229 ) 230 231 @property 232 def dpa_peer_to_peer(self) -> bool: 233 """:obj:`bool`: DPA peer-to-peer enabled. 234 235 Getter and setter. 236 """ 237 return self._dpa_peer_to_peer 238 239 @dpa_peer_to_peer.setter 240 def dpa_peer_to_peer(self, value: bool): 241 self._dpa_peer_to_peer = value 242 243 def get_dpa_peer_to_peer_byte(self) -> OsTrConfByte: 244 """Return DPA peer-to-peer configuration byte. 245 246 Returns: 247 :obj:`OsTrConfByte`: DPA peer-to-peer configuration byte. 248 """ 249 return OsTrConfByte( 250 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 251 value=2 if self._dpa_peer_to_peer else 0, 252 mask=TrConfBitMasks.DPA_PEER_TO_PEER 253 ) 254 255 @property 256 def routing_off(self) -> bool: 257 """:obj:`bool`: Node does not route packets in background. 258 259 Getter and setter. 260 """ 261 return self._routing_off 262 263 @routing_off.setter 264 def routing_off(self, value: bool): 265 self._routing_off = value 266 267 def get_routing_off_byte(self) -> OsTrConfByte: 268 """Returns routing off configuration byte. 269 270 Returns: 271 :obj:`OsTrConfByte`: Routing off configuration byte. 272 """ 273 return OsTrConfByte( 274 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 275 value=8 if self._routing_off else 0, 276 mask=TrConfBitMasks.ROUTING_OFF 277 ) 278 279 @property 280 def io_setup(self) -> bool: 281 """:obj:`bool`: Run IO Setup early during module boot time. 282 283 Getter and setter. 284 """ 285 return self._io_setup 286 287 @io_setup.setter 288 def io_setup(self, value: bool): 289 self._io_setup = value 290 291 def get_io_setup_byte(self) -> OsTrConfByte: 292 """Returns IO setup configuration byte. 293 294 Returns: 295 :obj:`OsTrConfByte`: IO setup configuration byte. 296 """ 297 return OsTrConfByte( 298 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 299 value=16 if self._io_setup else 0, 300 mask=TrConfBitMasks.IO_SETUP 301 ) 302 303 @property 304 def user_peer_to_peer(self) -> bool: 305 """:obj:`bool`: Receive peer-to-peer packets and raise PeerToPeer event. 306 307 Getter and setter. 308 """ 309 return self._user_peer_to_peer 310 311 @user_peer_to_peer.setter 312 def user_peer_to_peer(self, value: bool): 313 self._user_peer_to_peer = value 314 315 def get_user_peer_to_peer_byte(self) -> OsTrConfByte: 316 """Returns user peer-to-peer configuration byte. 317 318 Returns: 319 :obj:`OsTrConfByte`: User peer-to-peer configuration byte. 320 """ 321 return OsTrConfByte( 322 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 323 value=32 if self._user_peer_to_peer else 0, 324 mask=TrConfBitMasks.USER_PEER_TO_PEER, 325 ) 326 327 @property 328 def stay_awake_when_not_bonded(self) -> bool: 329 """:obj:`bool`: Stay awake during bonding process. 330 331 Getter and setter. 332 """ 333 return self._stay_awake_when_not_bonded 334 335 @stay_awake_when_not_bonded.setter 336 def stay_awake_when_not_bonded(self, value: bool): 337 self._stay_awake_when_not_bonded = value 338 339 def get_stay_awake_when_not_bonded_byte(self) -> OsTrConfByte: 340 """Returns stay awake when not bonded configuration byte. 341 342 Returns: 343 :obj:`OsTrConfByte`: Stay awake when not bonded configuration byte. 344 """ 345 return OsTrConfByte( 346 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 347 value=64 if self._stay_awake_when_not_bonded else 0, 348 mask=TrConfBitMasks.STAY_AWAKE_WHEN_NOT_BONDED, 349 ) 350 351 @property 352 def std_and_lp_network(self) -> bool: 353 """:obj:`bool`: Control STD+LP network. 354 355 Getter and setter. 356 """ 357 return self._std_and_lp_network 358 359 @std_and_lp_network.setter 360 def std_and_lp_network(self, value: bool): 361 self._std_and_lp_network = value 362 363 def get_std_and_lp_network_byte(self) -> OsTrConfByte: 364 """Returns STD and LP network configuration byte. 365 366 Returns: 367 :obj:`OsTrConfByte`: STD and LP network configuration byte. 368 """ 369 return OsTrConfByte( 370 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 371 value=128 if self._std_and_lp_network else 0, 372 mask=TrConfBitMasks.STD_AND_LP_NETWORK 373 ) 374 375 @staticmethod 376 def _validate_rf_output_power(rf_output_power: int) -> None: 377 """Validate rf output power parameter. 378 379 Args: 380 rf_output_power (int): RF output power. 381 382 Raises: 383 RequestParameterInvalidValueError: If rf_output_power is less than 0 or greater than 255. 384 """ 385 if not dpa_constants.BYTE_MIN <= rf_output_power <= dpa_constants.BYTE_MAX: 386 raise RequestParameterInvalidValueError('RF output power value should be between 0 and 255.') 387 388 @property 389 def rf_output_power(self) -> int: 390 """:obj:`int`: RF output power. 391 392 Getter and setter. 393 """ 394 return self._rf_output_power 395 396 @rf_output_power.setter 397 def rf_output_power(self, value: int): 398 self._validate_rf_output_power(rf_output_power=value) 399 self._rf_output_power = value 400 401 def get_rf_output_power_byte(self) -> OsTrConfByte: 402 """Returns RF output power configuration byte. 403 404 Returns: 405 :obj:`OsTrConfByte`: RF output power configuration byte. 406 """ 407 return OsTrConfByte( 408 address=TrConfByteAddrs.RF_OUTPUT_POWER, 409 value=self._rf_output_power, 410 mask=0xFF 411 ) 412 413 @staticmethod 414 def _validate_rf_signal_filter(rf_signal_filter: int) -> None: 415 """Validate rf signal filter parameter. 416 417 Args: 418 rf_signal_filter (int): RF signal filter. 419 420 Raises: 421 RequestParameterInvalidValueError: If rf_signal_filter is less than 0 or greater than 255. 422 """ 423 if not dpa_constants.BYTE_MIN <= rf_signal_filter <= dpa_constants.BYTE_MAX: 424 raise RequestParameterInvalidValueError('RF signal filter value should be between 0 and 255.') 425 426 @property 427 def rf_signal_filter(self) -> int: 428 """:obj:`int`: RF signal filter. 429 430 Getter and setter. 431 """ 432 return self._rf_signal_filter 433 434 @rf_signal_filter.setter 435 def rf_signal_filter(self, value: int): 436 self._validate_rf_signal_filter(rf_signal_filter=value) 437 self._rf_signal_filter = value 438 439 def get_rf_signal_filter_byte(self) -> OsTrConfByte: 440 """Returns RF signal filter configuration byte. 441 442 Returns: 443 :obj:`OsTrConfByte`: RF signal filter configuration byte. 444 """ 445 return OsTrConfByte( 446 address=TrConfByteAddrs.RF_SIGNAL_FILTER, 447 value=self._rf_signal_filter, 448 mask=0xFF 449 ) 450 451 @staticmethod 452 def _validate_lp_rf_timeout(lp_rf_timeout: int) -> None: 453 """Validate lp rf timeout parameter. 454 455 Args: 456 lp_rf_timeout (int): LP RF timeout. 457 458 Raises: 459 RequestParameterInvalidValueError: If lp_rf_timeout is less than 0 or greater than 255. 460 """ 461 if not dpa_constants.BYTE_MIN <= lp_rf_timeout <= dpa_constants.BYTE_MAX: 462 raise RequestParameterInvalidValueError('LP RF timeout value should be between 0 and 255.') 463 464 @property 465 def lp_rf_timeout(self) -> int: 466 """:obj:`int`: LP RF timeout. 467 468 Getter and setter. 469 """ 470 return self._lp_rf_timeout 471 472 @lp_rf_timeout.setter 473 def lp_rf_timeout(self, value: int): 474 self._validate_lp_rf_timeout(lp_rf_timeout=value) 475 self._lp_rf_timeout = value 476 477 def get_lp_rf_timeout_byte(self) -> OsTrConfByte: 478 """Returns LP RF timeout configuration byte. 479 480 Returns: 481 :obj:`OsTrConfByte`: LP RF timeout configuration byte. 482 """ 483 return OsTrConfByte( 484 address=TrConfByteAddrs.LP_RF_TIMEOUT, 485 value=self._lp_rf_timeout, 486 mask=0xFF 487 ) 488 489 @staticmethod 490 def _validate_uart_baud_rate(uart_baud_rate: Union[dpa_constants.BaudRates, int]) -> None: 491 """Validate uart baud rate parameter. 492 493 Args: 494 uart_baud_rate (Union[BaudRates, int]): UART Baud rate. 495 496 Raises: 497 RequestParameterInvalidValueError: If uart_baud_rate is less than 0 or greater than 255. 498 """ 499 if not dpa_constants.BYTE_MIN <= uart_baud_rate <= dpa_constants.BYTE_MAX: 500 raise RequestParameterInvalidValueError('UART baud rate value should be between 0 and 255.') 501 502 @property 503 def uart_baud_rate(self) -> Union[dpa_constants.BaudRates, int]: 504 """:obj:`BaudRates` or :obj:`int`: UART baud rate. 505 506 Getter and setter. 507 """ 508 return self._uart_baud_rate 509 510 @uart_baud_rate.setter 511 def uart_baud_rate(self, value: Union[dpa_constants.BaudRates, int]): 512 self._validate_uart_baud_rate(uart_baud_rate=value) 513 self._uart_baud_rate = value 514 515 def get_uart_baud_rate_byte(self) -> OsTrConfByte: 516 """Returns UART baud rate configuration byte. 517 518 Returns: 519 :obj:`OsTrConfByte`: UART baud rate configuration byte. 520 """ 521 return OsTrConfByte( 522 address=TrConfByteAddrs.UART_BAUD_RATE, 523 value=self._uart_baud_rate, 524 mask=0xFF 525 ) 526 527 @staticmethod 528 def _validate_alternative_dsm_channel(alternative_dsm_channels: int) -> None: 529 """Validate alternative dsm channel parameters. 530 531 Args: 532 alternative_dsm_channels (int): Alternative DSM channel. 533 534 Raises: 535 RequestParameterInvalidValueError: If alternative_dsm_channels is less than 0 or greater than 255. 536 """ 537 if not dpa_constants.BYTE_MIN <= alternative_dsm_channels <= dpa_constants.BYTE_MAX: 538 raise RequestParameterInvalidValueError('Alternative DMS channel value should be between 0 and 255.') 539 540 @property 541 def alternative_dsm_channel(self) -> int: 542 """:obj:`int`: Alternative DSM channel. 543 544 Getter and setter. 545 """ 546 return self._alternative_dsm_channel 547 548 @alternative_dsm_channel.setter 549 def alternative_dsm_channel(self, value: int): 550 self._validate_alternative_dsm_channel(alternative_dsm_channels=value) 551 self._alternative_dsm_channel = value 552 553 def get_alternative_dsm_channel_byte(self) -> OsTrConfByte: 554 """Returns alternative DSM channel configuration byte. 555 556 Returns: 557 :obj:`OsTrConfByte`: Alternative DSM channel configuration byte. 558 """ 559 return OsTrConfByte( 560 address=TrConfByteAddrs.ALTERNATIVE_DSM_CHANNEL, 561 value=self._alternative_dsm_channel, 562 mask=0xFF 563 ) 564 565 @property 566 def local_frc(self) -> bool: 567 """:obj:`bool`: Local FRC reception enabled. 568 569 Getter and setter. 570 """ 571 return self._local_frc 572 573 @local_frc.setter 574 def local_frc(self, value: bool): 575 self._local_frc = value 576 577 def get_local_frc_byte(self) -> OsTrConfByte: 578 """Returns local FRC configuration byte. 579 580 Returns: 581 :obj:`OsTrConfByte`: Local FRC configuration byte. 582 """ 583 return OsTrConfByte( 584 address=TrConfByteAddrs.DPA_CONFIG_BITS_1, 585 value=1 if self._local_frc else 0, 586 mask=TrConfBitMasks.LOCAL_FRC 587 ) 588 589 @staticmethod 590 def _validate_rf_channel_a(rf_channel_a: int) -> None: 591 """Validate RF Channel A Parameter. 592 593 Args: 594 rf_channel_a (int): RF Channel A. 595 596 Raises: 597 RequestParameterInvalidValueError: If rf_channel_a is less than 0 or greater than 255. 598 """ 599 if not dpa_constants.BYTE_MIN <= rf_channel_a <= dpa_constants.BYTE_MAX: 600 raise RequestParameterInvalidValueError('RF channel A value should be between 0 and 255.') 601 602 @property 603 def rf_channel_a(self) -> int: 604 """:obj:`int`: RF Channel A. 605 606 Getter and setter. 607 """ 608 return self._rf_channel_a 609 610 @rf_channel_a.setter 611 def rf_channel_a(self, value: int): 612 self._validate_rf_channel_a(rf_channel_a=value) 613 self._rf_channel_a = value 614 615 def get_rf_channel_a_byte(self) -> OsTrConfByte: 616 """Returns RF channel A configuration byte. 617 618 Returns: 619 :obj:`OsTrConfByte`: RF channel A configuration byte. 620 """ 621 return OsTrConfByte( 622 address=TrConfByteAddrs.RF_CHANNEL_A, 623 value=self._rf_channel_a, 624 mask=0xFF 625 ) 626 627 @staticmethod 628 def _validate_rf_channel_b(rf_channel_b: int) -> None: 629 """Validate RF Channel B Parameter. 630 631 Args: 632 rf_channel_b (int): RF Channel B. 633 634 Raises: 635 RequestParameterInvalidValueError: If rf_channel_b is less than 0 or greater than 255. 636 """ 637 if not dpa_constants.BYTE_MIN <= rf_channel_b <= dpa_constants.BYTE_MAX: 638 raise RequestParameterInvalidValueError('RF channel B value should be between 0 and 255.') 639 640 @property 641 def rf_channel_b(self) -> int: 642 """:obj:`int`: RF Channel B. 643 644 Getter and setter. 645 """ 646 return self._rf_channel_b 647 648 @rf_channel_b.setter 649 def rf_channel_b(self, value: int): 650 self._validate_rf_channel_b(rf_channel_b=value) 651 self._rf_channel_b = value 652 653 def get_rf_channel_b_byte(self) -> OsTrConfByte: 654 """Returns RF channel B configuration byte. 655 656 Returns: 657 :obj:`OsTrConfByte`: RF channel B configuration byte. 658 """ 659 return OsTrConfByte( 660 address=TrConfByteAddrs.RF_CHANNEL_B, 661 value=self._rf_channel_b, 662 mask=0xFF 663 ) 664 665 @staticmethod 666 def _validate_reserved_block_0(data: List[int]) -> None: 667 """Validate undocumented data bytes. 668 669 Args: 670 data (List[int]): Reserved data bytes. 671 672 Raises: 673 RequestParameterInvalidValueError: If data does not contain 2 values or if values are not 674 in range from 0 to 255. 675 """ 676 if len(data) != 2: 677 raise RequestParameterInvalidValueError('Reserved block 0 should be 2B long.') 678 if not Common.values_in_byte_range(data): 679 raise RequestParameterInvalidValueError('Reserved block 0 values should be between 0 and 255.') 680 681 @staticmethod 682 def _validate_reserved_block_1(data: List[int]) -> None: 683 """Validate undocumented data bytes. 684 685 Args: 686 data (List[int]): Reserved data bytes. 687 688 Raises: 689 RequestParameterInvalidValueError: If data does not contain 3 values or if values are not 690 in range from 0 to 255. 691 """ 692 if len(data) != 3: 693 raise RequestParameterInvalidValueError('Reserved block 1 should be 3B long.') 694 if not Common.values_in_byte_range(data): 695 raise RequestParameterInvalidValueError('Reserved block 1 values should be between 0 and 255.') 696 697 @staticmethod 698 def _validate_reserved_block_2(data: List[int]) -> None: 699 """Validate undocumented data bytes. 700 701 Args: 702 data (List[int]): Reserved data bytes. 703 704 Raises: 705 RequestParameterInvalidValueError: If data does not contain 13 values or if values are not 706 in range from 0 to 255. 707 """ 708 if len(data) != 13: 709 raise RequestParameterInvalidValueError('Reserved block 2 should be 13B long.') 710 if not Common.values_in_byte_range(data): 711 raise RequestParameterInvalidValueError('Reserved block 2 values should be between 0 and 255.') 712 713 @classmethod 714 def from_pdata(cls, data: Union[List[int], bytearray]) -> 'OsTrConfData': 715 """Deserialize DPA response pdata into OS TR Configuration data object. 716 717 Returns: 718 :obj:`OsTrConfData`: Deserialized OS TR Configuration data object. 719 """ 720 if isinstance(data, bytearray): 721 data = list(data) 722 embed_pers_data = data[0:4] 723 embedded_pers = [] 724 for i in range(0, len(embed_pers_data * 8)): 725 if embed_pers_data[int(i / 8)] & (1 << (i % 8)): 726 if i in EmbedPeripherals: 727 embedded_pers.append(EmbedPeripherals(i)) 728 else: 729 embedded_pers.append(i) 730 embedded_peripherals = embedded_pers 731 custom_dpa_handler = bool(data[4] & 1) 732 dpa_peer_to_peer = bool(data[4] & 2) 733 routing_off = bool(data[4] & 8) 734 io_setup = bool(data[4] & 16) 735 user_peer_to_peer = bool(data[4] & 32) 736 stay_awake_when_not_bonded = bool(data[4] & 64) 737 std_and_lp_network = bool(data[4] & 128) 738 rf_output_power = data[7] 739 rf_signal_filter = data[8] 740 lp_rf_timeout = data[9] 741 uart_baud_rate = data[10] 742 alternative_dsm_channel = data[11] 743 local_frc = bool(data[12] & 1) 744 rf_channel_a = data[16] 745 rf_channel_b = data[17] 746 reserved_block_0 = data[5:7] 747 reserved_block_1 = data[13:16] 748 reserved_block_2 = data[18:] 749 return cls(embedded_peripherals=embedded_peripherals, custom_dpa_handler=custom_dpa_handler, 750 dpa_peer_to_peer=dpa_peer_to_peer, routing_off=routing_off, io_setup=io_setup, 751 user_peer_to_peer=user_peer_to_peer, stay_awake_when_not_bonded=stay_awake_when_not_bonded, 752 std_and_lp_network=std_and_lp_network, rf_output_power=rf_output_power, 753 rf_signal_filter=rf_signal_filter, lp_rf_timeout=lp_rf_timeout, 754 uart_baud_rate=uart_baud_rate, alternative_dsm_channel=alternative_dsm_channel, 755 local_frc=local_frc, rf_channel_a=rf_channel_a, rf_channel_b=rf_channel_b, 756 reserved_block_0=reserved_block_0, reserved_block_1=reserved_block_1, 757 reserved_block_2=reserved_block_2) 758 759 def to_pdata(self, to_bytes: bool = False) -> Union[List[int], bytearray]: 760 """Serialize OS TR Configuration data object to DPA request pdata. 761 762 Args: 763 to_bytes (bool): Serialize into bytes. 764 765 Returns: 766 :obj:`list` of :obj:`int`: Serialized OS TR Configuration data.\n 767 :obj:`bytearray`: Serialize OS TR Configuration data bytes if to_bytes is True. 768 """ 769 embed_pers = Common.peripheral_list_to_bitmap(self.embedded_peripherals) 770 conf_bits_0 = int(self.custom_dpa_handler) | int(self.dpa_peer_to_peer) << 1 | int(self.routing_off) << 3 | \ 771 int(self.io_setup) << 4 | int(self.user_peer_to_peer) << 5 | \ 772 int(self.stay_awake_when_not_bonded) << 6 | int(self.std_and_lp_network) << 7 773 pdata = embed_pers + [conf_bits_0] + self._reserved_block_0 + \ 774 [ 775 self.rf_output_power, 776 self.rf_signal_filter, 777 self.lp_rf_timeout, 778 self.uart_baud_rate, 779 self.alternative_dsm_channel, 780 int(self.local_frc) 781 ] + self._reserved_block_1 + \ 782 [ 783 self.rf_channel_a, 784 self.rf_channel_b 785 ] + self._reserved_block_2 786 if to_bytes: 787 return bytearray(pdata) 788 return pdata 789 790 @staticmethod 791 def calculate_checksum(data: List[int]) -> int: 792 """Calculates checksum from TR configuration data. 793 794 Args: 795 data (List[int]): List of integers representing TR configuration data. 796 797 Returns: 798 int: Checksum value. 799 """ 800 checksum = 0x5F 801 for val in data: 802 checksum ^= val 803 return checksum
OS TR Configuration data.
20 def __init__(self, embedded_peripherals: Optional[List[Union[EmbedPeripherals, int]]] = None, 21 custom_dpa_handler: bool = False, dpa_peer_to_peer: bool = False, routing_off: bool = False, 22 io_setup: bool = False, user_peer_to_peer: bool = False, stay_awake_when_not_bonded: bool = False, 23 std_and_lp_network: bool = False, rf_output_power: int = 7, rf_signal_filter: int = 5, 24 lp_rf_timeout: int = 6, 25 uart_baud_rate: Union[dpa_constants.BaudRates, int] = dpa_constants.BaudRates.B9600, 26 alternative_dsm_channel: int = 0, local_frc: bool = False, rf_channel_a: int = 52, 27 rf_channel_b: int = 2, reserved_block_0: Optional[List[int]] = None, 28 reserved_block_1: Optional[List[int]] = None, reserved_block_2: Optional[List[int]] = None): 29 """TR Configuration data constructor. 30 31 Args: 32 embedded_peripherals (List[int]): Enabled embedded peripherals. 33 custom_dpa_handler (bool): Custom DPA handler in use. 34 dpa_peer_to_peer (bool): DPA peer-to-peer enabled. 35 routing_off (bool): Node does not route packets in background. 36 io_setup (bool): Run IO Setup early during module boot time. 37 user_peer_to_peer (bool): Receive peer-to-peer packets and raise PeerToPeer event. 38 stay_awake_when_not_bonded (bool): Stay awake during bonding process. 39 std_and_lp_network (bool): Control STD+LP network. 40 rf_output_power (int): RF output power. 41 rf_signal_filter (int): RF signal filter. 42 lp_rf_timeout (int): LP RF timeout. 43 uart_baud_rate (Union[dpa_constants.BaudRates, int]): UART baud rate. 44 alternative_dsm_channel (int): Alternative DSM channel. 45 local_frc (bool): Local FRC reception enabled. 46 rf_channel_a (int): RF Channel A. 47 rf_channel_b (int): RF Channel B. 48 reserved_block_0 (List[int], optional): Reserved data block. 49 reserved_block_1 (List[int], optional): Reserved data block. 50 reserved_block_2 (List[int], optional): Reserved data block. 51 """ 52 if embedded_peripherals is None: 53 embedded_peripherals = [] 54 if reserved_block_0 is None: 55 reserved_block_0 = [0] * 2 56 if reserved_block_1 is None: 57 reserved_block_1 = [0] * 3 58 if reserved_block_2 is None: 59 reserved_block_2 = [0] * 13 60 self._validate(embedded_peripherals=embedded_peripherals, rf_output_power=rf_output_power, 61 rf_signal_filter=rf_signal_filter, lp_rf_timeout=lp_rf_timeout, baud_rate=uart_baud_rate, 62 alternative_dsm=alternative_dsm_channel, rf_channel_a=rf_channel_a, rf_channel_b=rf_channel_b, 63 reserved_block_0=reserved_block_0, reserved_block_1=reserved_block_1, 64 reserved_block_2=reserved_block_2) 65 self._embedded_peripherals = embedded_peripherals 66 self._custom_dpa_handler = custom_dpa_handler 67 self._dpa_peer_to_peer = dpa_peer_to_peer 68 self._routing_off = routing_off 69 self._io_setup = io_setup 70 self._user_peer_to_peer = user_peer_to_peer 71 self._stay_awake_when_not_bonded = stay_awake_when_not_bonded 72 self._std_and_lp_network = std_and_lp_network 73 self._rf_output_power = rf_output_power 74 self._rf_signal_filter = rf_signal_filter 75 self._lp_rf_timeout = lp_rf_timeout 76 self._uart_baud_rate = uart_baud_rate 77 self._alternative_dsm_channel = alternative_dsm_channel 78 self._local_frc = local_frc 79 self._rf_channel_a = rf_channel_a 80 self._rf_channel_b = rf_channel_b 81 self._reserved_block_0 = reserved_block_0 82 self._reserved_block_1 = reserved_block_1 83 self._reserved_block_2 = reserved_block_2
TR Configuration data constructor.
Arguments:
- embedded_peripherals (List[int]): Enabled embedded peripherals.
- custom_dpa_handler (bool): Custom DPA handler in use.
- dpa_peer_to_peer (bool): DPA peer-to-peer enabled.
- routing_off (bool): Node does not route packets in background.
- io_setup (bool): Run IO Setup early during module boot time.
- user_peer_to_peer (bool): Receive peer-to-peer packets and raise PeerToPeer event.
- stay_awake_when_not_bonded (bool): Stay awake during bonding process.
- std_and_lp_network (bool): Control STD+LP network.
- rf_output_power (int): RF output power.
- rf_signal_filter (int): RF signal filter.
- lp_rf_timeout (int): LP RF timeout.
- uart_baud_rate (Union[dpa_constants.BaudRates, int]): UART baud rate.
- alternative_dsm_channel (int): Alternative DSM channel.
- local_frc (bool): Local FRC reception enabled.
- rf_channel_a (int): RF Channel A.
- rf_channel_b (int): RF Channel B.
- reserved_block_0 (List[int], optional): Reserved data block.
- reserved_block_1 (List[int], optional): Reserved data block.
- reserved_block_2 (List[int], optional): Reserved data block.
list
of EmbedPeripherals
or int
: Enabled embedded peripherals.
Getter and setter.
150 def enable_embedded_peripheral(self, peripheral: Union[EmbedPeripherals, int]) -> None: 151 """Enables embedded peripheral. 152 153 Args: 154 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 155 156 Raises: 157 ValueError: If peripheral value is less than 0 or greater than 31. 158 """ 159 if not (0 <= peripheral <= 31): 160 raise ValueError('Peripheral value should be between 0 and 31') 161 if peripheral not in self._embedded_peripherals: 162 self._embedded_peripherals.append(peripheral)
Enables embedded peripheral.
Arguments:
- peripheral (
EmbeddedPeripherals
orint
): Embedded peripheral.
Raises:
- ValueError: If peripheral value is less than 0 or greater than 31.
164 def disable_embedded_peripheral(self, peripheral: Union[EmbedPeripherals, int]) -> None: 165 """Disables embedded peripheral. 166 167 Args: 168 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 169 170 Raises: 171 ValueError: If peripheral value is less than 0 or greater than 31. 172 """ 173 if not (0 <= peripheral <= 31): 174 raise ValueError('Peripheral value should be between 0 and 31') 175 if peripheral in self._embedded_peripherals: 176 self._embedded_peripherals.remove(peripheral)
Disables embedded peripheral.
Arguments:
- peripheral (
EmbeddedPeripherals
orint
): Embedded peripheral.
Raises:
- ValueError: If peripheral value is less than 0 or greater than 31.
178 def get_embedded_peripheral_byte(self, peripheral: Union[EmbedPeripherals, int]) -> OsTrConfByte: 179 """Returns embedded peripheral configuration byte. 180 181 Args: 182 peripheral (:obj:`EmbeddedPeripherals` or :obj:`int`): Embedded peripheral. 183 184 Raises: 185 ValueError: If peripheral value is less than 0 or greater than 31. 186 187 Returns: 188 :obj:`OsTrConfByte`: Embedded peripheral configuration byte. 189 """ 190 if not (0 <= peripheral <= 31): 191 raise ValueError('Peripheral value should be between 0 and 31') 192 if 0 <= peripheral <= 7: 193 address = 1 194 elif 8 <= peripheral <= 15: 195 address = 2 196 elif 16 <= peripheral <= 23: 197 address = 3 198 else: 199 address = 4 200 value = 1 << (peripheral % 8) if peripheral in self._embedded_peripherals else 0 201 return OsTrConfByte( 202 address=address, 203 value=value, 204 mask=value 205 )
Returns embedded peripheral configuration byte.
Arguments:
- peripheral (
EmbeddedPeripherals
orint
): Embedded peripheral.
Raises:
- ValueError: If peripheral value is less than 0 or greater than 31.
Returns:
OsTrConfByte
: Embedded peripheral configuration byte.
219 def get_custom_dpa_handler_byte(self) -> OsTrConfByte: 220 """Returns custom DPA handler configuration byte. 221 222 Returns: 223 :obj:`OsTrConfByte`: Custom DPA handler configuration byte. 224 """ 225 return OsTrConfByte( 226 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 227 value=1 if self._custom_dpa_handler else 0, 228 mask=TrConfBitMasks.CUSTOM_DPA_HANDLER 229 )
Returns custom DPA handler configuration byte.
Returns:
OsTrConfByte
: Custom DPA handler configuration byte.
243 def get_dpa_peer_to_peer_byte(self) -> OsTrConfByte: 244 """Return DPA peer-to-peer configuration byte. 245 246 Returns: 247 :obj:`OsTrConfByte`: DPA peer-to-peer configuration byte. 248 """ 249 return OsTrConfByte( 250 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 251 value=2 if self._dpa_peer_to_peer else 0, 252 mask=TrConfBitMasks.DPA_PEER_TO_PEER 253 )
Return DPA peer-to-peer configuration byte.
Returns:
OsTrConfByte
: DPA peer-to-peer configuration byte.
267 def get_routing_off_byte(self) -> OsTrConfByte: 268 """Returns routing off configuration byte. 269 270 Returns: 271 :obj:`OsTrConfByte`: Routing off configuration byte. 272 """ 273 return OsTrConfByte( 274 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 275 value=8 if self._routing_off else 0, 276 mask=TrConfBitMasks.ROUTING_OFF 277 )
291 def get_io_setup_byte(self) -> OsTrConfByte: 292 """Returns IO setup configuration byte. 293 294 Returns: 295 :obj:`OsTrConfByte`: IO setup configuration byte. 296 """ 297 return OsTrConfByte( 298 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 299 value=16 if self._io_setup else 0, 300 mask=TrConfBitMasks.IO_SETUP 301 )
bool
: Receive peer-to-peer packets and raise PeerToPeer event.
Getter and setter.
315 def get_user_peer_to_peer_byte(self) -> OsTrConfByte: 316 """Returns user peer-to-peer configuration byte. 317 318 Returns: 319 :obj:`OsTrConfByte`: User peer-to-peer configuration byte. 320 """ 321 return OsTrConfByte( 322 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 323 value=32 if self._user_peer_to_peer else 0, 324 mask=TrConfBitMasks.USER_PEER_TO_PEER, 325 )
Returns user peer-to-peer configuration byte.
Returns:
OsTrConfByte
: User peer-to-peer configuration byte.
339 def get_stay_awake_when_not_bonded_byte(self) -> OsTrConfByte: 340 """Returns stay awake when not bonded configuration byte. 341 342 Returns: 343 :obj:`OsTrConfByte`: Stay awake when not bonded configuration byte. 344 """ 345 return OsTrConfByte( 346 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 347 value=64 if self._stay_awake_when_not_bonded else 0, 348 mask=TrConfBitMasks.STAY_AWAKE_WHEN_NOT_BONDED, 349 )
Returns stay awake when not bonded configuration byte.
Returns:
OsTrConfByte
: Stay awake when not bonded configuration byte.
363 def get_std_and_lp_network_byte(self) -> OsTrConfByte: 364 """Returns STD and LP network configuration byte. 365 366 Returns: 367 :obj:`OsTrConfByte`: STD and LP network configuration byte. 368 """ 369 return OsTrConfByte( 370 address=TrConfByteAddrs.DPA_CONFIG_BITS_0, 371 value=128 if self._std_and_lp_network else 0, 372 mask=TrConfBitMasks.STD_AND_LP_NETWORK 373 )
Returns STD and LP network configuration byte.
Returns:
OsTrConfByte
: STD and LP network configuration byte.
401 def get_rf_output_power_byte(self) -> OsTrConfByte: 402 """Returns RF output power configuration byte. 403 404 Returns: 405 :obj:`OsTrConfByte`: RF output power configuration byte. 406 """ 407 return OsTrConfByte( 408 address=TrConfByteAddrs.RF_OUTPUT_POWER, 409 value=self._rf_output_power, 410 mask=0xFF 411 )
Returns RF output power configuration byte.
Returns:
OsTrConfByte
: RF output power configuration byte.
439 def get_rf_signal_filter_byte(self) -> OsTrConfByte: 440 """Returns RF signal filter configuration byte. 441 442 Returns: 443 :obj:`OsTrConfByte`: RF signal filter configuration byte. 444 """ 445 return OsTrConfByte( 446 address=TrConfByteAddrs.RF_SIGNAL_FILTER, 447 value=self._rf_signal_filter, 448 mask=0xFF 449 )
Returns RF signal filter configuration byte.
Returns:
OsTrConfByte
: RF signal filter configuration byte.
477 def get_lp_rf_timeout_byte(self) -> OsTrConfByte: 478 """Returns LP RF timeout configuration byte. 479 480 Returns: 481 :obj:`OsTrConfByte`: LP RF timeout configuration byte. 482 """ 483 return OsTrConfByte( 484 address=TrConfByteAddrs.LP_RF_TIMEOUT, 485 value=self._lp_rf_timeout, 486 mask=0xFF 487 )
BaudRates
or int
: UART baud rate.
Getter and setter.
515 def get_uart_baud_rate_byte(self) -> OsTrConfByte: 516 """Returns UART baud rate configuration byte. 517 518 Returns: 519 :obj:`OsTrConfByte`: UART baud rate configuration byte. 520 """ 521 return OsTrConfByte( 522 address=TrConfByteAddrs.UART_BAUD_RATE, 523 value=self._uart_baud_rate, 524 mask=0xFF 525 )
Returns UART baud rate configuration byte.
Returns:
OsTrConfByte
: UART baud rate configuration byte.
553 def get_alternative_dsm_channel_byte(self) -> OsTrConfByte: 554 """Returns alternative DSM channel configuration byte. 555 556 Returns: 557 :obj:`OsTrConfByte`: Alternative DSM channel configuration byte. 558 """ 559 return OsTrConfByte( 560 address=TrConfByteAddrs.ALTERNATIVE_DSM_CHANNEL, 561 value=self._alternative_dsm_channel, 562 mask=0xFF 563 )
Returns alternative DSM channel configuration byte.
Returns:
OsTrConfByte
: Alternative DSM channel configuration byte.
577 def get_local_frc_byte(self) -> OsTrConfByte: 578 """Returns local FRC configuration byte. 579 580 Returns: 581 :obj:`OsTrConfByte`: Local FRC configuration byte. 582 """ 583 return OsTrConfByte( 584 address=TrConfByteAddrs.DPA_CONFIG_BITS_1, 585 value=1 if self._local_frc else 0, 586 mask=TrConfBitMasks.LOCAL_FRC 587 )
615 def get_rf_channel_a_byte(self) -> OsTrConfByte: 616 """Returns RF channel A configuration byte. 617 618 Returns: 619 :obj:`OsTrConfByte`: RF channel A configuration byte. 620 """ 621 return OsTrConfByte( 622 address=TrConfByteAddrs.RF_CHANNEL_A, 623 value=self._rf_channel_a, 624 mask=0xFF 625 )
653 def get_rf_channel_b_byte(self) -> OsTrConfByte: 654 """Returns RF channel B configuration byte. 655 656 Returns: 657 :obj:`OsTrConfByte`: RF channel B configuration byte. 658 """ 659 return OsTrConfByte( 660 address=TrConfByteAddrs.RF_CHANNEL_B, 661 value=self._rf_channel_b, 662 mask=0xFF 663 )
713 @classmethod 714 def from_pdata(cls, data: Union[List[int], bytearray]) -> 'OsTrConfData': 715 """Deserialize DPA response pdata into OS TR Configuration data object. 716 717 Returns: 718 :obj:`OsTrConfData`: Deserialized OS TR Configuration data object. 719 """ 720 if isinstance(data, bytearray): 721 data = list(data) 722 embed_pers_data = data[0:4] 723 embedded_pers = [] 724 for i in range(0, len(embed_pers_data * 8)): 725 if embed_pers_data[int(i / 8)] & (1 << (i % 8)): 726 if i in EmbedPeripherals: 727 embedded_pers.append(EmbedPeripherals(i)) 728 else: 729 embedded_pers.append(i) 730 embedded_peripherals = embedded_pers 731 custom_dpa_handler = bool(data[4] & 1) 732 dpa_peer_to_peer = bool(data[4] & 2) 733 routing_off = bool(data[4] & 8) 734 io_setup = bool(data[4] & 16) 735 user_peer_to_peer = bool(data[4] & 32) 736 stay_awake_when_not_bonded = bool(data[4] & 64) 737 std_and_lp_network = bool(data[4] & 128) 738 rf_output_power = data[7] 739 rf_signal_filter = data[8] 740 lp_rf_timeout = data[9] 741 uart_baud_rate = data[10] 742 alternative_dsm_channel = data[11] 743 local_frc = bool(data[12] & 1) 744 rf_channel_a = data[16] 745 rf_channel_b = data[17] 746 reserved_block_0 = data[5:7] 747 reserved_block_1 = data[13:16] 748 reserved_block_2 = data[18:] 749 return cls(embedded_peripherals=embedded_peripherals, custom_dpa_handler=custom_dpa_handler, 750 dpa_peer_to_peer=dpa_peer_to_peer, routing_off=routing_off, io_setup=io_setup, 751 user_peer_to_peer=user_peer_to_peer, stay_awake_when_not_bonded=stay_awake_when_not_bonded, 752 std_and_lp_network=std_and_lp_network, rf_output_power=rf_output_power, 753 rf_signal_filter=rf_signal_filter, lp_rf_timeout=lp_rf_timeout, 754 uart_baud_rate=uart_baud_rate, alternative_dsm_channel=alternative_dsm_channel, 755 local_frc=local_frc, rf_channel_a=rf_channel_a, rf_channel_b=rf_channel_b, 756 reserved_block_0=reserved_block_0, reserved_block_1=reserved_block_1, 757 reserved_block_2=reserved_block_2)
Deserialize DPA response pdata into OS TR Configuration data object.
Returns:
OsTrConfData
: Deserialized OS TR Configuration data object.
759 def to_pdata(self, to_bytes: bool = False) -> Union[List[int], bytearray]: 760 """Serialize OS TR Configuration data object to DPA request pdata. 761 762 Args: 763 to_bytes (bool): Serialize into bytes. 764 765 Returns: 766 :obj:`list` of :obj:`int`: Serialized OS TR Configuration data.\n 767 :obj:`bytearray`: Serialize OS TR Configuration data bytes if to_bytes is True. 768 """ 769 embed_pers = Common.peripheral_list_to_bitmap(self.embedded_peripherals) 770 conf_bits_0 = int(self.custom_dpa_handler) | int(self.dpa_peer_to_peer) << 1 | int(self.routing_off) << 3 | \ 771 int(self.io_setup) << 4 | int(self.user_peer_to_peer) << 5 | \ 772 int(self.stay_awake_when_not_bonded) << 6 | int(self.std_and_lp_network) << 7 773 pdata = embed_pers + [conf_bits_0] + self._reserved_block_0 + \ 774 [ 775 self.rf_output_power, 776 self.rf_signal_filter, 777 self.lp_rf_timeout, 778 self.uart_baud_rate, 779 self.alternative_dsm_channel, 780 int(self.local_frc) 781 ] + self._reserved_block_1 + \ 782 [ 783 self.rf_channel_a, 784 self.rf_channel_b 785 ] + self._reserved_block_2 786 if to_bytes: 787 return bytearray(pdata) 788 return pdata
Serialize OS TR Configuration data object to DPA request pdata.
Arguments:
- to_bytes (bool): Serialize into bytes.
Returns:
list
ofint
: Serialized OS TR Configuration data.
bytearray
: Serialize OS TR Configuration data bytes if to_bytes is True.
790 @staticmethod 791 def calculate_checksum(data: List[int]) -> int: 792 """Calculates checksum from TR configuration data. 793 794 Args: 795 data (List[int]): List of integers representing TR configuration data. 796 797 Returns: 798 int: Checksum value. 799 """ 800 checksum = 0x5F 801 for val in data: 802 checksum ^= val 803 return checksum
Calculates checksum from TR configuration data.
Arguments:
- data (List[int]): List of integers representing TR configuration data.
Returns:
int: Checksum value.
9class SensorWrittenData: 10 """Sensor Written Data class.""" 11 12 __slots__ = '_index', '_data' 13 14 def __init__(self, index: int, data: List[int]): 15 """Sensor Written Data constructor. 16 17 Args: 18 index (int): Sensor index. 19 data (List[int]): Data to write. 20 """ 21 self._validate(index=index, data=data) 22 self._index = index 23 self._data = data 24 25 def _validate(self, index: int, data: List[int]): 26 """Validate parameters. 27 28 Args: 29 index (int): Sensor index. 30 data (List[int]): Data to write. 31 """ 32 self._validate_index(index) 33 self._validate_data(data) 34 35 @staticmethod 36 def _validate_index(index: int): 37 """Validate sensor index parameter. 38 39 Args: 40 index (int): Sensor index. 41 42 Raises: 43 RequestParameterInvalidValueError: If index is less than 0 or greater than 31. 44 """ 45 if not dpa_constants.SENSOR_INDEX_MIN <= index <= dpa_constants.SENSOR_INDEX_MAX: 46 raise RequestParameterInvalidValueError('Index value should be between 0 and 31.') 47 48 @property 49 def index(self) -> int: 50 """:obj:`int`: Sensor index. 51 52 Getter and setter. 53 """ 54 return self._index 55 56 @index.setter 57 def index(self, value: int): 58 self._validate_index(value) 59 self._index = value 60 61 @staticmethod 62 def _validate_data(data: List[int]): 63 """Validate data to write. 64 65 Args: 66 data (List[int]): Data to write. 67 68 Raises: 69 RequestParameterInvalidValueError: If data contains values not in range from 0 to 255. 70 """ 71 if not Common.values_in_byte_range(data): 72 raise RequestParameterInvalidValueError('Data values should be between 0 and 255.') 73 74 @property 75 def data(self) -> List[int]: 76 """:obj:`list` of :obj:`int`: Data to write. 77 78 Getter and setter. 79 """ 80 return self._data 81 82 @data.setter 83 def data(self, value: List[int]): 84 self._validate_data(value) 85 self._data = value 86 87 def to_pdata(self) -> List[int]: 88 """Serialize index and data to parameters. 89 90 Returns: 91 :obj:`list` of `int`: Serialized written data. 92 """ 93 return [self.index] + self.data
Sensor Written Data class.
14 def __init__(self, index: int, data: List[int]): 15 """Sensor Written Data constructor. 16 17 Args: 18 index (int): Sensor index. 19 data (List[int]): Data to write. 20 """ 21 self._validate(index=index, data=data) 22 self._index = index 23 self._data = data
Sensor Written Data constructor.
Arguments:
- index (int): Sensor index.
- data (List[int]): Data to write.
87 def to_pdata(self) -> List[int]: 88 """Serialize index and data to parameters. 89 90 Returns: 91 :obj:`list` of `int`: Serialized written data. 92 """ 93 return [self.index] + self.data
Serialize index and data to parameters.
Returns:
list
ofint
: Serialized written data.
9@dataclass 10class SensorData: 11 """Class representing Sensor (quantity) information and value.""" 12 13 __slots__ = 'sensor_type', 'index', 'name', 'short_name', 'unit', 'decimal_places', 'frc_commands', 'value' 14 15 sensor_type: SensorTypes 16 index: int 17 name: str 18 short_name: str 19 unit: str 20 decimal_places: int 21 frc_commands: List[int] 22 value: Union[int, float, List[int], SensorFrcErrors, None] 23 24 def __init__(self, sensor_type: SensorTypes, index: int, name: str, 25 short_name: str, unit: str, decimal_places: int, frc_commands: List[int], 26 value: Union[int, float, List[int], SensorFrcErrors, None] = None): 27 """Class representing Sensor (quantity) information and value. 28 29 Args: 30 sensor_type (SensorTypes): Sensor type (represents a quantity). 31 index (int): Index of sensor. 32 name (str): Quantity name. 33 short_name (str): Short quantity name. 34 unit (str): Quantity unit. 35 decimal_places (int): Precision. 36 frc_commands (List[int]): Implemented FRC commands. 37 value (Union[int, float, List[int], SensorFrcErrors, None]): Measured value. 38 """ 39 self.sensor_type = sensor_type 40 """Sensor type (represents a quantity).""" 41 self.index = index 42 """Index of sensor.""" 43 self.name = name 44 """Quantity name""" 45 self.short_name = short_name 46 """Short quantity name.""" 47 self.unit = unit 48 """Quantity unit.""" 49 self.decimal_places = decimal_places 50 """Precision.""" 51 self.frc_commands = frc_commands 52 """Implemented FRC commands.""" 53 self.value = value 54 """Measured value."""
Class representing Sensor (quantity) information and value.
24 def __init__(self, sensor_type: SensorTypes, index: int, name: str, 25 short_name: str, unit: str, decimal_places: int, frc_commands: List[int], 26 value: Union[int, float, List[int], SensorFrcErrors, None] = None): 27 """Class representing Sensor (quantity) information and value. 28 29 Args: 30 sensor_type (SensorTypes): Sensor type (represents a quantity). 31 index (int): Index of sensor. 32 name (str): Quantity name. 33 short_name (str): Short quantity name. 34 unit (str): Quantity unit. 35 decimal_places (int): Precision. 36 frc_commands (List[int]): Implemented FRC commands. 37 value (Union[int, float, List[int], SensorFrcErrors, None]): Measured value. 38 """ 39 self.sensor_type = sensor_type 40 """Sensor type (represents a quantity).""" 41 self.index = index 42 """Index of sensor.""" 43 self.name = name 44 """Quantity name""" 45 self.short_name = short_name 46 """Short quantity name.""" 47 self.unit = unit 48 """Quantity unit.""" 49 self.decimal_places = decimal_places 50 """Precision.""" 51 self.frc_commands = frc_commands 52 """Implemented FRC commands.""" 53 self.value = value 54 """Measured value."""
Class representing Sensor (quantity) information and value.
Arguments:
- sensor_type (SensorTypes): Sensor type (represents a quantity).
- index (int): Index of sensor.
- name (str): Quantity name.
- short_name (str): Short quantity name.
- unit (str): Quantity unit.
- decimal_places (int): Precision.
- frc_commands (List[int]): Implemented FRC commands.
- value (Union[int, float, List[int], SensorFrcErrors, None]): Measured value.
Measured value.