hat.gateway.devices.modbus.master.remote_device
1import asyncio 2import collections 3import contextlib 4import enum 5import functools 6import itertools 7import logging 8import math 9import time 10import typing 11 12from hat import aio 13from hat import json 14from hat.drivers import modbus 15 16from hat.gateway.devices.modbus.master import common 17from hat.gateway.devices.modbus.master.connection import Connection 18 19 20mlog = logging.getLogger(__name__) 21 22 23ResponseCb: typing.TypeAlias = aio.AsyncCallable[[common.Response], None] 24 25 26class _Status(enum.Enum): 27 DISABLED = 'DISABLED' 28 CONNECTING = 'CONNECTING' 29 CONNECTED = 'CONNECTED' 30 DISCONNECTED = 'DISCONNECTED' 31 32 33class _DataInfo(typing.NamedTuple): 34 data_type: modbus.DataType 35 register_size: int 36 start_address: int 37 bit_count: int 38 bit_offset: int 39 quantity: int 40 interval: float | None 41 name: str 42 43 44class _DataGroup(typing.NamedTuple): 45 data_infos: list[_DataInfo] 46 interval: float 47 data_type: modbus.DataType 48 start_address: int 49 quantity: int 50 51 52class RemoteDevice: 53 54 def __init__(self, 55 conf: json.Data, 56 conn: Connection, 57 name: str): 58 self._conn = conn 59 self._name = name 60 self._device_id = conf['device_id'] 61 self._timeout_poll_delay = conf['timeout_poll_delay'] 62 self._data_infos = {data_info.name: data_info 63 for data_info in _get_data_infos(conf)} 64 self._data_groups = list(_group_data_infos(self._data_infos.values())) 65 self._log = common.create_remote_device_logger_adapter(mlog, name, 66 self._device_id) 67 68 @property 69 def conn(self) -> Connection: 70 return self._conn 71 72 @property 73 def device_id(self) -> int: 74 return self._device_id 75 76 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 77 return _Reader(conn=self._conn, 78 name=self._name, 79 device_id=self._device_id, 80 timeout_poll_delay=self._timeout_poll_delay, 81 data_groups=self._data_groups, 82 response_cb=response_cb) 83 84 async def write(self, 85 data_name: str, 86 request_id: str, 87 value: int 88 ) -> common.RemoteDeviceWriteRes | None: 89 data_info = self._data_infos.get(data_name) 90 if not data_info: 91 self._log.debug('data %s is not available', data_name) 92 return 93 94 if data_info.data_type == modbus.DataType.COIL: 95 res = await self._write_coil(data_info, value) 96 97 elif data_info.data_type == modbus.DataType.HOLDING_REGISTER: 98 res = await self._write_holding_register(data_info, value) 99 100 else: 101 self._log.debug('write unsupported for %s', data_info.data_type) 102 return 103 104 if isinstance(res, modbus.Error): 105 result = res.name 106 107 elif isinstance(res, common.Timeout): 108 result = 'TIMEOUT' 109 110 else: 111 result = 'SUCCESS' 112 113 return common.RemoteDeviceWriteRes( 114 device_id=self._device_id, 115 data_name=data_name, 116 request_id=request_id, 117 result=result) 118 119 async def _write_coil(self, data_info, value): 120 address = data_info.start_address + data_info.bit_offset 121 registers = [(value >> (data_info.bit_count - i - 1)) & 1 122 for i in range(data_info.bit_count)] 123 req = modbus.WriteReq(device_id=self._device_id, 124 data_type=data_info.data_type, 125 start_address=address, 126 values=registers) 127 128 return await self._conn.send(req) 129 130 async def _write_holding_register(self, data_info, value): 131 address = data_info.start_address + (data_info.bit_offset // 16) 132 bit_count = data_info.bit_count 133 bit_offset = data_info.bit_offset % 16 134 135 if bit_offset: 136 mask_prefix_size = bit_offset 137 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 138 mask_size = 16 - mask_prefix_size - mask_suffix_size 139 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 140 ((0xFFFF << mask_suffix_size) >> 16)) 141 or_mask = (((value >> (bit_count - mask_size)) & 142 ((1 << mask_size) - 1)) << 143 mask_suffix_size) 144 req = modbus.WriteMaskReq(device_id=self._device_id, 145 address=address, 146 and_mask=and_mask, 147 or_mask=or_mask) 148 149 res = await self._conn.send(req) 150 if not isinstance(res, modbus.Success): 151 return res 152 153 address += 1 154 bit_count -= mask_size 155 156 register_count = bit_count // 16 157 if register_count: 158 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 159 for i in range(register_count)] 160 req = modbus.WriteReq(device_id=self._device_id, 161 data_type=data_info.data_type, 162 start_address=address, 163 values=registers) 164 165 res = await self._conn.send(req) 166 if not isinstance(res, modbus.Success): 167 return res 168 169 address += register_count 170 bit_count -= 16 * register_count 171 172 if not bit_count: 173 return modbus.Success() 174 175 and_mask = (0xFFFF << (16 - bit_count)) >> 16 176 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 177 req = modbus.WriteMaskReq(device_id=self._device_id, 178 address=address, 179 and_mask=and_mask, 180 or_mask=or_mask) 181 182 return await self._conn.send(req) 183 184 185class _Reader(aio.Resource): 186 187 def __init__(self, conn, name, device_id, timeout_poll_delay, data_groups, 188 response_cb): 189 self._conn = conn 190 self._device_id = device_id 191 self._timeout_poll_delay = timeout_poll_delay 192 self._response_cb = response_cb 193 self._status = None 194 self._async_group = conn.async_group.create_subgroup() 195 self._log = common.create_remote_device_logger_adapter(mlog, name, 196 device_id) 197 198 self.async_group.spawn(self._read_loop, data_groups) 199 200 @property 201 def async_group(self) -> aio.Group: 202 return self._async_group 203 204 async def _read_loop(self, data_groups): 205 try: 206 self._log.debug('starting read loop') 207 loop = asyncio.get_running_loop() 208 209 if not data_groups: 210 await self._set_status(_Status.CONNECTED) 211 await loop.create_future() 212 213 last_read_times = [None for _ in data_groups] 214 last_responses = {} 215 await self._set_status(_Status.CONNECTING) 216 217 while True: 218 now = time.monotonic() 219 sleep_dt = None 220 read_data_groups = collections.deque() 221 222 for i, data_group in enumerate(data_groups): 223 last_read_time = last_read_times[i] 224 if last_read_time is not None: 225 dt = data_group.interval - now + last_read_time 226 if dt > 0: 227 if sleep_dt is None or dt < sleep_dt: 228 sleep_dt = dt 229 continue 230 231 read_data_groups.append(data_group) 232 last_read_times[i] = now 233 234 if not read_data_groups: 235 await asyncio.sleep(sleep_dt) 236 continue 237 238 timeout = False 239 240 self._log.debug('reading data') 241 for data_group in read_data_groups: 242 req = modbus.ReadReq( 243 device_id=self._device_id, 244 data_type=data_group.data_type, 245 start_address=data_group.start_address, 246 quantity=data_group.quantity) 247 248 res = await self._conn.send(req) 249 250 if isinstance(res, common.Timeout): 251 timeout = True 252 break 253 254 await self._set_status(_Status.CONNECTED) 255 256 for data_info in data_group.data_infos: 257 last_response = last_responses.get(data_info.name) 258 259 response = self._process_read_res( 260 data_info, data_group.start_address, res, 261 last_response) 262 263 if response: 264 last_responses[data_info.name] = response 265 await aio.call(self._response_cb, response) 266 267 if timeout: 268 await self._set_status(_Status.DISCONNECTED) 269 await asyncio.sleep(self._timeout_poll_delay) 270 271 last_read_times = [None for _ in data_groups] 272 last_responses = {} 273 await self._set_status(_Status.CONNECTING) 274 275 except ConnectionError: 276 self._log.debug('connection closed') 277 278 except Exception as e: 279 self._log.error('read loop error: %s', e, exc_info=e) 280 281 finally: 282 self._log.debug('closing read loop') 283 self.close() 284 285 with contextlib.suppress(Exception): 286 await aio.uncancellable(self._set_status(_Status.DISABLED)) 287 288 def _process_read_res(self, data_info, start_address, res, last_response): 289 if isinstance(res, modbus.Error): 290 result = res.name 291 292 elif isinstance(res, common.Timeout): 293 result = 'TIMEOUT' 294 295 else: 296 result = 'SUCCESS' 297 298 if result != 'SUCCESS': 299 self._log.debug('data name %s: error response %s', 300 data_info.name, result) 301 return common.RemoteDeviceReadRes(device_id=self._device_id, 302 data_name=data_info.name, 303 result=result, 304 value=None, 305 cause=None) 306 307 offset = data_info.start_address - start_address 308 value = _get_registers_value( 309 data_info.register_size, data_info.bit_offset, 310 data_info.bit_count, 311 res[offset:offset+data_info.quantity]) 312 313 if last_response is None or last_response.result != 'SUCCESS': 314 self._log.debug('data name %s: initial value %s', 315 data_info.name, value) 316 return common.RemoteDeviceReadRes(device_id=self._device_id, 317 data_name=data_info.name, 318 result=result, 319 value=value, 320 cause='INTERROGATE') 321 322 if last_response.value != value: 323 self._log.debug('data name %s: value change %s -> %s', 324 data_info.name, last_response.value, value) 325 return common.RemoteDeviceReadRes(device_id=self._device_id, 326 data_name=data_info.name, 327 result=result, 328 value=value, 329 cause='CHANGE') 330 331 self._log.debug('data name %s: no value change', data_info.name) 332 333 async def _set_status(self, status): 334 if self._status == status: 335 return 336 337 self._log.debug('changing remote device status %s -> %s', 338 self._status, status) 339 self._status = status 340 341 response = common.RemoteDeviceStatusRes(device_id=self._device_id, 342 status=status.name) 343 await aio.call(self._response_cb, response) 344 345 346def _get_data_infos(conf): 347 for i in conf['data']: 348 data_type = modbus.DataType[i['data_type']] 349 register_size = _get_register_size(data_type) 350 bit_count = i['bit_count'] 351 bit_offset = i['bit_offset'] 352 353 yield _DataInfo( 354 data_type=data_type, 355 register_size=register_size, 356 start_address=i['start_address'], 357 bit_count=bit_count, 358 bit_offset=bit_offset, 359 quantity=math.ceil((bit_count + bit_offset) / register_size), 360 interval=i['interval'], 361 name=i['name']) 362 363 364def _group_data_infos(data_infos): 365 type_interval_infos_dict = collections.defaultdict( 366 functools.partial(collections.defaultdict, collections.deque)) 367 368 for data_info in data_infos: 369 data_type = data_info.data_type 370 interval = data_info.interval 371 372 if interval is None: 373 continue 374 375 type_interval_infos_dict[data_type][interval].append(data_info) 376 377 for data_type, interval_infos_dict in type_interval_infos_dict.items(): 378 for interval, data_infos_queue in interval_infos_dict.items(): 379 yield from _group_data_infos_with_type_interval( 380 data_infos_queue, data_type, interval) 381 382 383def _group_data_infos_with_type_interval(data_infos, data_type, interval): 384 data_infos_queue = sorted(data_infos, 385 key=lambda i: (i.start_address, i.quantity)) 386 data_infos_queue = collections.deque(data_infos_queue) 387 388 while data_infos_queue: 389 max_quantity = _get_max_quantity(data_type) 390 start_address = None 391 quantity = None 392 data_infos = collections.deque() 393 394 while data_infos_queue: 395 data_info = data_infos_queue.popleft() 396 397 if start_address is None: 398 start_address = data_info.start_address 399 400 if quantity is None: 401 quantity = data_info.quantity 402 403 elif data_info.start_address > start_address + quantity: 404 data_infos_queue.appendleft(data_info) 405 break 406 407 else: 408 new_quantity = (data_info.quantity + 409 data_info.start_address - 410 start_address) 411 412 if new_quantity > max_quantity: 413 data_infos_queue.appendleft(data_info) 414 break 415 416 if new_quantity > quantity: 417 quantity = new_quantity 418 419 data_infos.append(data_info) 420 421 if start_address is None or quantity is None: 422 continue 423 424 yield _DataGroup(data_infos=data_infos, 425 interval=interval, 426 data_type=data_type, 427 start_address=start_address, 428 quantity=quantity) 429 430 431def _get_register_size(data_type): 432 if data_type in (modbus.DataType.COIL, 433 modbus.DataType.DISCRETE_INPUT): 434 return 1 435 436 if data_type in (modbus.DataType.HOLDING_REGISTER, 437 modbus.DataType.INPUT_REGISTER, 438 modbus.DataType.QUEUE): 439 return 16 440 441 raise ValueError('invalid data type') 442 443 444def _get_max_quantity(data_type): 445 if data_type in (modbus.DataType.COIL, 446 modbus.DataType.DISCRETE_INPUT): 447 return 2000 448 449 if data_type in (modbus.DataType.HOLDING_REGISTER, 450 modbus.DataType.INPUT_REGISTER): 451 return 125 452 453 if data_type == modbus.DataType.QUEUE: 454 return 1 455 456 raise ValueError('invalid data type') 457 458 459def _get_registers_value(register_size, bit_offset, bit_count, values): 460 result = 0 461 bits = itertools.chain(_get_registers_bits(register_size, values), 462 itertools.repeat(0)) 463 for i in itertools.islice(bits, bit_offset, bit_offset + bit_count): 464 result = (result << 1) | i 465 return result 466 467 468def _get_registers_bits(register_size, values): 469 for value in values: 470 for i in range(register_size): 471 yield (value >> (register_size - i - 1)) & 1
mlog =
<Logger hat.gateway.devices.modbus.master.remote_device (WARNING)>
ResponseCb: TypeAlias =
Callable[[hat.gateway.devices.modbus.master.common.StatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.common.RemoteDeviceWriteRes], None | Awaitable[None]]
class
RemoteDevice:
53class RemoteDevice: 54 55 def __init__(self, 56 conf: json.Data, 57 conn: Connection, 58 name: str): 59 self._conn = conn 60 self._name = name 61 self._device_id = conf['device_id'] 62 self._timeout_poll_delay = conf['timeout_poll_delay'] 63 self._data_infos = {data_info.name: data_info 64 for data_info in _get_data_infos(conf)} 65 self._data_groups = list(_group_data_infos(self._data_infos.values())) 66 self._log = common.create_remote_device_logger_adapter(mlog, name, 67 self._device_id) 68 69 @property 70 def conn(self) -> Connection: 71 return self._conn 72 73 @property 74 def device_id(self) -> int: 75 return self._device_id 76 77 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 78 return _Reader(conn=self._conn, 79 name=self._name, 80 device_id=self._device_id, 81 timeout_poll_delay=self._timeout_poll_delay, 82 data_groups=self._data_groups, 83 response_cb=response_cb) 84 85 async def write(self, 86 data_name: str, 87 request_id: str, 88 value: int 89 ) -> common.RemoteDeviceWriteRes | None: 90 data_info = self._data_infos.get(data_name) 91 if not data_info: 92 self._log.debug('data %s is not available', data_name) 93 return 94 95 if data_info.data_type == modbus.DataType.COIL: 96 res = await self._write_coil(data_info, value) 97 98 elif data_info.data_type == modbus.DataType.HOLDING_REGISTER: 99 res = await self._write_holding_register(data_info, value) 100 101 else: 102 self._log.debug('write unsupported for %s', data_info.data_type) 103 return 104 105 if isinstance(res, modbus.Error): 106 result = res.name 107 108 elif isinstance(res, common.Timeout): 109 result = 'TIMEOUT' 110 111 else: 112 result = 'SUCCESS' 113 114 return common.RemoteDeviceWriteRes( 115 device_id=self._device_id, 116 data_name=data_name, 117 request_id=request_id, 118 result=result) 119 120 async def _write_coil(self, data_info, value): 121 address = data_info.start_address + data_info.bit_offset 122 registers = [(value >> (data_info.bit_count - i - 1)) & 1 123 for i in range(data_info.bit_count)] 124 req = modbus.WriteReq(device_id=self._device_id, 125 data_type=data_info.data_type, 126 start_address=address, 127 values=registers) 128 129 return await self._conn.send(req) 130 131 async def _write_holding_register(self, data_info, value): 132 address = data_info.start_address + (data_info.bit_offset // 16) 133 bit_count = data_info.bit_count 134 bit_offset = data_info.bit_offset % 16 135 136 if bit_offset: 137 mask_prefix_size = bit_offset 138 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 139 mask_size = 16 - mask_prefix_size - mask_suffix_size 140 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 141 ((0xFFFF << mask_suffix_size) >> 16)) 142 or_mask = (((value >> (bit_count - mask_size)) & 143 ((1 << mask_size) - 1)) << 144 mask_suffix_size) 145 req = modbus.WriteMaskReq(device_id=self._device_id, 146 address=address, 147 and_mask=and_mask, 148 or_mask=or_mask) 149 150 res = await self._conn.send(req) 151 if not isinstance(res, modbus.Success): 152 return res 153 154 address += 1 155 bit_count -= mask_size 156 157 register_count = bit_count // 16 158 if register_count: 159 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 160 for i in range(register_count)] 161 req = modbus.WriteReq(device_id=self._device_id, 162 data_type=data_info.data_type, 163 start_address=address, 164 values=registers) 165 166 res = await self._conn.send(req) 167 if not isinstance(res, modbus.Success): 168 return res 169 170 address += register_count 171 bit_count -= 16 * register_count 172 173 if not bit_count: 174 return modbus.Success() 175 176 and_mask = (0xFFFF << (16 - bit_count)) >> 16 177 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 178 req = modbus.WriteMaskReq(device_id=self._device_id, 179 address=address, 180 and_mask=and_mask, 181 or_mask=or_mask) 182 183 return await self._conn.send(req)
RemoteDevice( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], conn: hat.gateway.devices.modbus.master.connection.Connection, name: str)
55 def __init__(self, 56 conf: json.Data, 57 conn: Connection, 58 name: str): 59 self._conn = conn 60 self._name = name 61 self._device_id = conf['device_id'] 62 self._timeout_poll_delay = conf['timeout_poll_delay'] 63 self._data_infos = {data_info.name: data_info 64 for data_info in _get_data_infos(conf)} 65 self._data_groups = list(_group_data_infos(self._data_infos.values())) 66 self._log = common.create_remote_device_logger_adapter(mlog, name, 67 self._device_id)
def
create_reader( self, response_cb: Callable[[hat.gateway.devices.modbus.master.common.StatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.common.RemoteDeviceWriteRes], None | Awaitable[None]]) -> hat.aio.group.Resource:
async def
write( self, data_name: str, request_id: str, value: int) -> hat.gateway.devices.modbus.master.common.RemoteDeviceWriteRes | None:
85 async def write(self, 86 data_name: str, 87 request_id: str, 88 value: int 89 ) -> common.RemoteDeviceWriteRes | None: 90 data_info = self._data_infos.get(data_name) 91 if not data_info: 92 self._log.debug('data %s is not available', data_name) 93 return 94 95 if data_info.data_type == modbus.DataType.COIL: 96 res = await self._write_coil(data_info, value) 97 98 elif data_info.data_type == modbus.DataType.HOLDING_REGISTER: 99 res = await self._write_holding_register(data_info, value) 100 101 else: 102 self._log.debug('write unsupported for %s', data_info.data_type) 103 return 104 105 if isinstance(res, modbus.Error): 106 result = res.name 107 108 elif isinstance(res, common.Timeout): 109 result = 'TIMEOUT' 110 111 else: 112 result = 'SUCCESS' 113 114 return common.RemoteDeviceWriteRes( 115 device_id=self._device_id, 116 data_name=data_name, 117 request_id=request_id, 118 result=result)