hat.gateway.devices.modbus.master.remote_device
1import asyncio 2import collections 3import contextlib 4import enum 5import functools 6import itertools 7import logging 8import math 9import time 10import typing 11 12from hat import aio 13from hat import json 14 15from hat.gateway.devices.modbus.master.connection import (DataType, 16 Error, 17 Connection) 18from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceStatusRes, # NOQA 19 RemoteDeviceReadRes, # NOQA 20 RemoteDeviceWriteRes, # NOQA 21 Response) 22 23 24mlog = logging.getLogger(__name__) 25 26 27ResponseCb: typing.TypeAlias = aio.AsyncCallable[[Response], None] 28 29 30class _Status(enum.Enum): 31 DISABLED = 'DISABLED' 32 CONNECTING = 'CONNECTING' 33 CONNECTED = 'CONNECTED' 34 DISCONNECTED = 'DISCONNECTED' 35 36 37class _DataInfo(typing.NamedTuple): 38 data_type: DataType 39 register_size: int 40 start_address: int 41 bit_count: int 42 bit_offset: int 43 quantity: int 44 interval: float | None 45 name: str 46 47 48class _DataGroup(typing.NamedTuple): 49 data_infos: list[_DataInfo] 50 interval: float 51 data_type: DataType 52 start_address: int 53 quantity: int 54 55 56class RemoteDevice: 57 58 def __init__(self, 59 conf: json.Data, 60 conn: Connection, 61 log_prefix: str): 62 self._conn = conn 63 self._device_id = conf['device_id'] 64 self._timeout_poll_delay = conf['timeout_poll_delay'] 65 self._log_prefix = f"{log_prefix}: remote device id {self._device_id}" 66 self._data_infos = {data_info.name: data_info 67 for data_info in _get_data_infos(conf)} 68 self._data_groups = list(_group_data_infos(self._data_infos.values())) 69 70 @property 71 def conn(self) -> Connection: 72 return self._conn 73 74 @property 75 def device_id(self) -> int: 76 return self._device_id 77 78 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 79 return _Reader(conn=self._conn, 80 device_id=self._device_id, 81 timeout_poll_delay=self._timeout_poll_delay, 82 data_groups=self._data_groups, 83 response_cb=response_cb, 84 log_prefix=self._log_prefix) 85 86 async def write(self, 87 data_name: str, 88 request_id: str, 89 value: int 90 ) -> RemoteDeviceWriteRes | None: 91 data_info = self._data_infos.get(data_name) 92 if not data_info: 93 self._log(logging.DEBUG, 'data %s is not available', data_name) 94 return 95 96 if data_info.data_type == DataType.COIL: 97 result = await self._write_coil(data_info, value) 98 99 elif data_info.data_type == DataType.HOLDING_REGISTER: 100 result = await self._write_holding_register(data_info, value) 101 102 else: 103 self._log(logging.DEBUG, 'write unsupported for %s', 104 data_info.data_type) 105 return 106 107 return RemoteDeviceWriteRes( 108 device_id=self._device_id, 109 data_name=data_name, 110 request_id=request_id, 111 result=result.name if result else 'SUCCESS') 112 113 async def _write_coil(self, data_info, value): 114 address = data_info.start_address + data_info.bit_offset 115 registers = [(value >> (data_info.bit_count - i - 1)) & 1 116 for i in range(data_info.bit_count)] 117 return await self._conn.write(device_id=self._device_id, 118 data_type=data_info.data_type, 119 start_address=address, 120 values=registers) 121 122 async def _write_holding_register(self, data_info, value): 123 address = data_info.start_address + (data_info.bit_offset // 16) 124 bit_count = data_info.bit_count 125 bit_offset = data_info.bit_offset % 16 126 127 if bit_offset: 128 mask_prefix_size = bit_offset 129 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 130 mask_size = 16 - mask_prefix_size - mask_suffix_size 131 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 132 ((0xFFFF << mask_suffix_size) >> 16)) 133 or_mask = (((value >> (bit_count - mask_size)) & 134 ((1 << mask_size) - 1)) << 135 mask_suffix_size) 136 result = await self._conn.write_mask(device_id=self._device_id, 137 address=address, 138 and_mask=and_mask, 139 or_mask=or_mask) 140 if result: 141 return result 142 address += 1 143 bit_count -= mask_size 144 145 register_count = bit_count // 16 146 if register_count: 147 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 148 for i in range(register_count)] 149 result = await self._conn.write(device_id=self._device_id, 150 data_type=data_info.data_type, 151 start_address=address, 152 values=registers) 153 if result: 154 return result 155 address += register_count 156 bit_count -= 16 * register_count 157 158 if not bit_count: 159 return 160 161 and_mask = (0xFFFF << (16 - bit_count)) >> 16 162 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 163 return await self._conn.write_mask(device_id=self._device_id, 164 address=address, 165 and_mask=and_mask, 166 or_mask=or_mask) 167 168 def _log(self, level, msg, *args, **kwargs): 169 if not mlog.isEnabledFor(level): 170 return 171 172 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs) 173 174 175class _Reader(aio.Resource): 176 177 def __init__(self, conn, device_id, timeout_poll_delay, data_groups, 178 response_cb, log_prefix): 179 self._conn = conn 180 self._device_id = device_id 181 self._timeout_poll_delay = timeout_poll_delay 182 self._response_cb = response_cb 183 self._log_prefix = log_prefix 184 self._status = None 185 self._async_group = conn.async_group.create_subgroup() 186 187 self.async_group.spawn(self._read_loop, data_groups) 188 189 @property 190 def async_group(self) -> aio.Group: 191 return self._async_group 192 193 async def _read_loop(self, data_groups): 194 try: 195 self._log(logging.DEBUG, 'starting read loop') 196 loop = asyncio.get_running_loop() 197 198 if not data_groups: 199 await self._set_status(_Status.CONNECTED) 200 await loop.create_future() 201 202 last_read_times = [None for _ in data_groups] 203 last_responses = {} 204 await self._set_status(_Status.CONNECTING) 205 206 while True: 207 now = time.monotonic() 208 sleep_dt = None 209 read_data_groups = collections.deque() 210 211 for i, data_group in enumerate(data_groups): 212 last_read_time = last_read_times[i] 213 if last_read_time is not None: 214 dt = data_group.interval - now + last_read_time 215 if dt > 0: 216 if sleep_dt is None or dt < sleep_dt: 217 sleep_dt = dt 218 continue 219 220 read_data_groups.append(data_group) 221 last_read_times[i] = now 222 223 if not read_data_groups: 224 await asyncio.sleep(sleep_dt) 225 continue 226 227 timeout = False 228 229 self._log(logging.DEBUG, 'reading data') 230 for data_group in read_data_groups: 231 result = await self._conn.read( 232 device_id=self._device_id, 233 data_type=data_group.data_type, 234 start_address=data_group.start_address, 235 quantity=data_group.quantity) 236 237 if isinstance(result, Error) and result.name == 'TIMEOUT': 238 timeout = True 239 break 240 241 await self._set_status(_Status.CONNECTED) 242 243 for data_info in data_group.data_infos: 244 last_response = last_responses.get(data_info.name) 245 246 response = self._process_read_result( 247 data_info, data_group.start_address, 248 result, last_response) 249 250 if response: 251 last_responses[data_info.name] = response 252 await aio.call(self._response_cb, response) 253 254 if timeout: 255 await self._set_status(_Status.DISCONNECTED) 256 await asyncio.sleep(self._timeout_poll_delay) 257 258 last_read_times = [None for _ in data_groups] 259 last_responses = {} 260 await self._set_status(_Status.CONNECTING) 261 262 except ConnectionError: 263 self._log(logging.DEBUG, 'connection closed') 264 265 except Exception as e: 266 self._log(logging.ERROR, 'read loop error: %s', e, exc_info=e) 267 268 finally: 269 self._log(logging.DEBUG, 'closing read loop') 270 self.close() 271 272 with contextlib.suppress(Exception): 273 await aio.uncancellable(self._set_status(_Status.DISABLED)) 274 275 def _process_read_result(self, data_info, start_address, result, 276 last_response): 277 if isinstance(result, Error): 278 self._log(logging.DEBUG, 'data name %s: error response %s', 279 data_info.name, result) 280 return RemoteDeviceReadRes(device_id=self._device_id, 281 data_name=data_info.name, 282 result=result.name, 283 value=None, 284 cause=None) 285 286 offset = data_info.start_address - start_address 287 value = _get_registers_value( 288 data_info.register_size, data_info.bit_offset, 289 data_info.bit_count, 290 result[offset:offset+data_info.quantity]) 291 292 if last_response is None or last_response.result != 'SUCCESS': 293 self._log(logging.DEBUG, 'data name %s: initial value %s', 294 data_info.name, value) 295 return RemoteDeviceReadRes(device_id=self._device_id, 296 data_name=data_info.name, 297 result='SUCCESS', 298 value=value, 299 cause='INTERROGATE') 300 301 if last_response.value != value: 302 self._log(logging.DEBUG, 'data name %s: value change %s -> %s', 303 data_info.name, last_response.value, value) 304 return RemoteDeviceReadRes(device_id=self._device_id, 305 data_name=data_info.name, 306 result='SUCCESS', 307 value=value, 308 cause='CHANGE') 309 310 self._log(logging.DEBUG, 'data name %s: no value change', 311 data_info.name) 312 313 async def _set_status(self, status): 314 if self._status == status: 315 return 316 317 self._log(logging.DEBUG, 'changing remote device status %s -> %s', 318 self._status, status) 319 self._status = status 320 321 res = RemoteDeviceStatusRes(device_id=self._device_id, 322 status=status.name) 323 await aio.call(self._response_cb, res) 324 325 def _log(self, level, msg, *args, **kwargs): 326 if not mlog.isEnabledFor(level): 327 return 328 329 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs) 330 331 332def _get_data_infos(conf): 333 for i in conf['data']: 334 data_type = DataType[i['data_type']] 335 register_size = _get_register_size(data_type) 336 bit_count = i['bit_count'] 337 bit_offset = i['bit_offset'] 338 339 yield _DataInfo( 340 data_type=data_type, 341 register_size=register_size, 342 start_address=i['start_address'], 343 bit_count=bit_count, 344 bit_offset=bit_offset, 345 quantity=math.ceil((bit_count + bit_offset) / register_size), 346 interval=i['interval'], 347 name=i['name']) 348 349 350def _group_data_infos(data_infos): 351 type_interval_infos_dict = collections.defaultdict( 352 functools.partial(collections.defaultdict, collections.deque)) 353 354 for data_info in data_infos: 355 data_type = data_info.data_type 356 interval = data_info.interval 357 358 if interval is None: 359 continue 360 361 type_interval_infos_dict[data_type][interval].append(data_info) 362 363 for data_type, interval_infos_dict in type_interval_infos_dict.items(): 364 for interval, data_infos_queue in interval_infos_dict.items(): 365 yield from _group_data_infos_with_type_interval( 366 data_infos_queue, data_type, interval) 367 368 369def _group_data_infos_with_type_interval(data_infos, data_type, interval): 370 data_infos_queue = sorted(data_infos, 371 key=lambda i: (i.start_address, i.quantity)) 372 data_infos_queue = collections.deque(data_infos_queue) 373 374 while data_infos_queue: 375 max_quantity = _get_max_quantity(data_type) 376 start_address = None 377 quantity = None 378 data_infos = collections.deque() 379 380 while data_infos_queue: 381 data_info = data_infos_queue.popleft() 382 383 if start_address is None: 384 start_address = data_info.start_address 385 386 if quantity is None: 387 quantity = data_info.quantity 388 389 elif data_info.start_address > start_address + quantity: 390 data_infos_queue.appendleft(data_info) 391 break 392 393 else: 394 new_quantity = (data_info.quantity + 395 data_info.start_address - 396 start_address) 397 398 if new_quantity > max_quantity: 399 data_infos_queue.appendleft(data_info) 400 break 401 402 if new_quantity > quantity: 403 quantity = new_quantity 404 405 data_infos.append(data_info) 406 407 if start_address is None or quantity is None: 408 continue 409 410 yield _DataGroup(data_infos=data_infos, 411 interval=interval, 412 data_type=data_type, 413 start_address=start_address, 414 quantity=quantity) 415 416 417def _get_register_size(data_type): 418 if data_type in (DataType.COIL, 419 DataType.DISCRETE_INPUT): 420 return 1 421 422 if data_type in (DataType.HOLDING_REGISTER, 423 DataType.INPUT_REGISTER, 424 DataType.QUEUE): 425 return 16 426 427 raise ValueError('invalid data type') 428 429 430def _get_max_quantity(data_type): 431 if data_type in (DataType.COIL, 432 DataType.DISCRETE_INPUT): 433 return 2000 434 435 if data_type in (DataType.HOLDING_REGISTER, 436 DataType.INPUT_REGISTER): 437 return 125 438 439 if data_type == DataType.QUEUE: 440 return 1 441 442 raise ValueError('invalid data type') 443 444 445def _get_registers_value(register_size, bit_offset, bit_count, values): 446 result = 0 447 bits = itertools.chain(_get_registers_bits(register_size, values), 448 itertools.repeat(0)) 449 for i in itertools.islice(bits, bit_offset, bit_offset + bit_count): 450 result = (result << 1) | i 451 return result 452 453 454def _get_registers_bits(register_size, values): 455 for value in values: 456 for i in range(register_size): 457 yield (value >> (register_size - i - 1)) & 1
mlog =
<Logger hat.gateway.devices.modbus.master.remote_device (WARNING)>
ResponseCb: TypeAlias =
Callable[[hat.gateway.devices.modbus.master.eventer_client.StatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes], Optional[Awaitable[NoneType]]]
class
RemoteDevice:
57class RemoteDevice: 58 59 def __init__(self, 60 conf: json.Data, 61 conn: Connection, 62 log_prefix: str): 63 self._conn = conn 64 self._device_id = conf['device_id'] 65 self._timeout_poll_delay = conf['timeout_poll_delay'] 66 self._log_prefix = f"{log_prefix}: remote device id {self._device_id}" 67 self._data_infos = {data_info.name: data_info 68 for data_info in _get_data_infos(conf)} 69 self._data_groups = list(_group_data_infos(self._data_infos.values())) 70 71 @property 72 def conn(self) -> Connection: 73 return self._conn 74 75 @property 76 def device_id(self) -> int: 77 return self._device_id 78 79 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 80 return _Reader(conn=self._conn, 81 device_id=self._device_id, 82 timeout_poll_delay=self._timeout_poll_delay, 83 data_groups=self._data_groups, 84 response_cb=response_cb, 85 log_prefix=self._log_prefix) 86 87 async def write(self, 88 data_name: str, 89 request_id: str, 90 value: int 91 ) -> RemoteDeviceWriteRes | None: 92 data_info = self._data_infos.get(data_name) 93 if not data_info: 94 self._log(logging.DEBUG, 'data %s is not available', data_name) 95 return 96 97 if data_info.data_type == DataType.COIL: 98 result = await self._write_coil(data_info, value) 99 100 elif data_info.data_type == DataType.HOLDING_REGISTER: 101 result = await self._write_holding_register(data_info, value) 102 103 else: 104 self._log(logging.DEBUG, 'write unsupported for %s', 105 data_info.data_type) 106 return 107 108 return RemoteDeviceWriteRes( 109 device_id=self._device_id, 110 data_name=data_name, 111 request_id=request_id, 112 result=result.name if result else 'SUCCESS') 113 114 async def _write_coil(self, data_info, value): 115 address = data_info.start_address + data_info.bit_offset 116 registers = [(value >> (data_info.bit_count - i - 1)) & 1 117 for i in range(data_info.bit_count)] 118 return await self._conn.write(device_id=self._device_id, 119 data_type=data_info.data_type, 120 start_address=address, 121 values=registers) 122 123 async def _write_holding_register(self, data_info, value): 124 address = data_info.start_address + (data_info.bit_offset // 16) 125 bit_count = data_info.bit_count 126 bit_offset = data_info.bit_offset % 16 127 128 if bit_offset: 129 mask_prefix_size = bit_offset 130 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 131 mask_size = 16 - mask_prefix_size - mask_suffix_size 132 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 133 ((0xFFFF << mask_suffix_size) >> 16)) 134 or_mask = (((value >> (bit_count - mask_size)) & 135 ((1 << mask_size) - 1)) << 136 mask_suffix_size) 137 result = await self._conn.write_mask(device_id=self._device_id, 138 address=address, 139 and_mask=and_mask, 140 or_mask=or_mask) 141 if result: 142 return result 143 address += 1 144 bit_count -= mask_size 145 146 register_count = bit_count // 16 147 if register_count: 148 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 149 for i in range(register_count)] 150 result = await self._conn.write(device_id=self._device_id, 151 data_type=data_info.data_type, 152 start_address=address, 153 values=registers) 154 if result: 155 return result 156 address += register_count 157 bit_count -= 16 * register_count 158 159 if not bit_count: 160 return 161 162 and_mask = (0xFFFF << (16 - bit_count)) >> 16 163 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 164 return await self._conn.write_mask(device_id=self._device_id, 165 address=address, 166 and_mask=and_mask, 167 or_mask=or_mask) 168 169 def _log(self, level, msg, *args, **kwargs): 170 if not mlog.isEnabledFor(level): 171 return 172 173 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
RemoteDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], conn: hat.gateway.devices.modbus.master.connection.Connection, log_prefix: str)
59 def __init__(self, 60 conf: json.Data, 61 conn: Connection, 62 log_prefix: str): 63 self._conn = conn 64 self._device_id = conf['device_id'] 65 self._timeout_poll_delay = conf['timeout_poll_delay'] 66 self._log_prefix = f"{log_prefix}: remote device id {self._device_id}" 67 self._data_infos = {data_info.name: data_info 68 for data_info in _get_data_infos(conf)} 69 self._data_groups = list(_group_data_infos(self._data_infos.values()))
def
create_reader( self, response_cb: Callable[[hat.gateway.devices.modbus.master.eventer_client.StatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes], Optional[Awaitable[NoneType]]]) -> hat.aio.group.Resource:
async def
write( self, data_name: str, request_id: str, value: int) -> hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes | None:
87 async def write(self, 88 data_name: str, 89 request_id: str, 90 value: int 91 ) -> RemoteDeviceWriteRes | None: 92 data_info = self._data_infos.get(data_name) 93 if not data_info: 94 self._log(logging.DEBUG, 'data %s is not available', data_name) 95 return 96 97 if data_info.data_type == DataType.COIL: 98 result = await self._write_coil(data_info, value) 99 100 elif data_info.data_type == DataType.HOLDING_REGISTER: 101 result = await self._write_holding_register(data_info, value) 102 103 else: 104 self._log(logging.DEBUG, 'write unsupported for %s', 105 data_info.data_type) 106 return 107 108 return RemoteDeviceWriteRes( 109 device_id=self._device_id, 110 data_name=data_name, 111 request_id=request_id, 112 result=result.name if result else 'SUCCESS')