hat.gateway.devices.modbus.master.remote_device         
                
                        
                        
                        1import asyncio 2import collections 3import contextlib 4import enum 5import functools 6import itertools 7import logging 8import math 9import time 10import typing 11 12from hat import aio 13from hat import json 14 15from hat.gateway.devices.modbus.master import common 16from hat.gateway.devices.modbus.master.connection import (DataType, 17 Error, 18 Connection) 19from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceStatusRes, # NOQA 20 RemoteDeviceReadRes, # NOQA 21 RemoteDeviceWriteRes, # NOQA 22 Response) 23 24 25mlog = logging.getLogger(__name__) 26 27 28ResponseCb: typing.TypeAlias = aio.AsyncCallable[[Response], None] 29 30 31class _Status(enum.Enum): 32 DISABLED = 'DISABLED' 33 CONNECTING = 'CONNECTING' 34 CONNECTED = 'CONNECTED' 35 DISCONNECTED = 'DISCONNECTED' 36 37 38class _DataInfo(typing.NamedTuple): 39 data_type: DataType 40 register_size: int 41 start_address: int 42 bit_count: int 43 bit_offset: int 44 quantity: int 45 interval: float | None 46 name: str 47 48 49class _DataGroup(typing.NamedTuple): 50 data_infos: list[_DataInfo] 51 interval: float 52 data_type: DataType 53 start_address: int 54 quantity: int 55 56 57class RemoteDevice: 58 59 def __init__(self, 60 conf: json.Data, 61 conn: Connection, 62 name: str): 63 self._conn = conn 64 self._name = name 65 self._device_id = conf['device_id'] 66 self._timeout_poll_delay = conf['timeout_poll_delay'] 67 self._data_infos = {data_info.name: data_info 68 for data_info in _get_data_infos(conf)} 69 self._data_groups = list(_group_data_infos(self._data_infos.values())) 70 self._log = common.create_remote_device_logger_adapter(mlog, name, 71 self._device_id) 72 73 @property 74 def conn(self) -> Connection: 75 return self._conn 76 77 @property 78 def device_id(self) -> int: 79 return self._device_id 80 81 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 82 return _Reader(conn=self._conn, 83 name=self._name, 84 device_id=self._device_id, 85 timeout_poll_delay=self._timeout_poll_delay, 86 data_groups=self._data_groups, 87 response_cb=response_cb) 88 89 async def write(self, 90 data_name: str, 91 request_id: str, 92 value: int 93 ) -> RemoteDeviceWriteRes | None: 94 data_info = self._data_infos.get(data_name) 95 if not data_info: 96 self._log.debug('data %s is not available', data_name) 97 return 98 99 if data_info.data_type == DataType.COIL: 100 result = await self._write_coil(data_info, value) 101 102 elif data_info.data_type == DataType.HOLDING_REGISTER: 103 result = await self._write_holding_register(data_info, value) 104 105 else: 106 self._log.debug('write unsupported for %s', data_info.data_type) 107 return 108 109 return RemoteDeviceWriteRes( 110 device_id=self._device_id, 111 data_name=data_name, 112 request_id=request_id, 113 result=result.name if result else 'SUCCESS') 114 115 async def _write_coil(self, data_info, value): 116 address = data_info.start_address + data_info.bit_offset 117 registers = [(value >> (data_info.bit_count - i - 1)) & 1 118 for i in range(data_info.bit_count)] 119 return await self._conn.write(device_id=self._device_id, 120 data_type=data_info.data_type, 121 start_address=address, 122 values=registers) 123 124 async def _write_holding_register(self, data_info, value): 125 address = data_info.start_address + (data_info.bit_offset // 16) 126 bit_count = data_info.bit_count 127 bit_offset = data_info.bit_offset % 16 128 129 if bit_offset: 130 mask_prefix_size = bit_offset 131 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 132 mask_size = 16 - mask_prefix_size - mask_suffix_size 133 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 134 ((0xFFFF << mask_suffix_size) >> 16)) 135 or_mask = (((value >> (bit_count - mask_size)) & 136 ((1 << mask_size) - 1)) << 137 mask_suffix_size) 138 result = await self._conn.write_mask(device_id=self._device_id, 139 address=address, 140 and_mask=and_mask, 141 or_mask=or_mask) 142 if result: 143 return result 144 address += 1 145 bit_count -= mask_size 146 147 register_count = bit_count // 16 148 if register_count: 149 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 150 for i in range(register_count)] 151 result = await self._conn.write(device_id=self._device_id, 152 data_type=data_info.data_type, 153 start_address=address, 154 values=registers) 155 if result: 156 return result 157 address += register_count 158 bit_count -= 16 * register_count 159 160 if not bit_count: 161 return 162 163 and_mask = (0xFFFF << (16 - bit_count)) >> 16 164 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 165 return await self._conn.write_mask(device_id=self._device_id, 166 address=address, 167 and_mask=and_mask, 168 or_mask=or_mask) 169 170 171class _Reader(aio.Resource): 172 173 def __init__(self, conn, name, device_id, timeout_poll_delay, data_groups, 174 response_cb): 175 self._conn = conn 176 self._device_id = device_id 177 self._timeout_poll_delay = timeout_poll_delay 178 self._response_cb = response_cb 179 self._status = None 180 self._async_group = conn.async_group.create_subgroup() 181 self._log = common.create_remote_device_logger_adapter(mlog, name, 182 device_id) 183 184 self.async_group.spawn(self._read_loop, data_groups) 185 186 @property 187 def async_group(self) -> aio.Group: 188 return self._async_group 189 190 async def _read_loop(self, data_groups): 191 try: 192 self._log.debug('starting read loop') 193 loop = asyncio.get_running_loop() 194 195 if not data_groups: 196 await self._set_status(_Status.CONNECTED) 197 await loop.create_future() 198 199 last_read_times = [None for _ in data_groups] 200 last_responses = {} 201 await self._set_status(_Status.CONNECTING) 202 203 while True: 204 now = time.monotonic() 205 sleep_dt = None 206 read_data_groups = collections.deque() 207 208 for i, data_group in enumerate(data_groups): 209 last_read_time = last_read_times[i] 210 if last_read_time is not None: 211 dt = data_group.interval - now + last_read_time 212 if dt > 0: 213 if sleep_dt is None or dt < sleep_dt: 214 sleep_dt = dt 215 continue 216 217 read_data_groups.append(data_group) 218 last_read_times[i] = now 219 220 if not read_data_groups: 221 await asyncio.sleep(sleep_dt) 222 continue 223 224 timeout = False 225 226 self._log.debug('reading data') 227 for data_group in read_data_groups: 228 result = await self._conn.read( 229 device_id=self._device_id, 230 data_type=data_group.data_type, 231 start_address=data_group.start_address, 232 quantity=data_group.quantity) 233 234 if isinstance(result, Error) and result.name == 'TIMEOUT': 235 timeout = True 236 break 237 238 await self._set_status(_Status.CONNECTED) 239 240 for data_info in data_group.data_infos: 241 last_response = last_responses.get(data_info.name) 242 243 response = self._process_read_result( 244 data_info, data_group.start_address, 245 result, last_response) 246 247 if response: 248 last_responses[data_info.name] = response 249 await aio.call(self._response_cb, response) 250 251 if timeout: 252 await self._set_status(_Status.DISCONNECTED) 253 await asyncio.sleep(self._timeout_poll_delay) 254 255 last_read_times = [None for _ in data_groups] 256 last_responses = {} 257 await self._set_status(_Status.CONNECTING) 258 259 except ConnectionError: 260 self._log.debug('connection closed') 261 262 except Exception as e: 263 self._log.error('read loop error: %s', e, exc_info=e) 264 265 finally: 266 self._log.debug('closing read loop') 267 self.close() 268 269 with contextlib.suppress(Exception): 270 await aio.uncancellable(self._set_status(_Status.DISABLED)) 271 272 def _process_read_result(self, data_info, start_address, result, 273 last_response): 274 if isinstance(result, Error): 275 self._log.debug('data name %s: error response %s', 276 data_info.name, result) 277 return RemoteDeviceReadRes(device_id=self._device_id, 278 data_name=data_info.name, 279 result=result.name, 280 value=None, 281 cause=None) 282 283 offset = data_info.start_address - start_address 284 value = _get_registers_value( 285 data_info.register_size, data_info.bit_offset, 286 data_info.bit_count, 287 result[offset:offset+data_info.quantity]) 288 289 if last_response is None or last_response.result != 'SUCCESS': 290 self._log.debug('data name %s: initial value %s', 291 data_info.name, value) 292 return RemoteDeviceReadRes(device_id=self._device_id, 293 data_name=data_info.name, 294 result='SUCCESS', 295 value=value, 296 cause='INTERROGATE') 297 298 if last_response.value != value: 299 self._log.debug('data name %s: value change %s -> %s', 300 data_info.name, last_response.value, value) 301 return RemoteDeviceReadRes(device_id=self._device_id, 302 data_name=data_info.name, 303 result='SUCCESS', 304 value=value, 305 cause='CHANGE') 306 307 self._log.debug('data name %s: no value change', data_info.name) 308 309 async def _set_status(self, status): 310 if self._status == status: 311 return 312 313 self._log.debug('changing remote device status %s -> %s', 314 self._status, status) 315 self._status = status 316 317 res = RemoteDeviceStatusRes(device_id=self._device_id, 318 status=status.name) 319 await aio.call(self._response_cb, res) 320 321 322def _get_data_infos(conf): 323 for i in conf['data']: 324 data_type = DataType[i['data_type']] 325 register_size = _get_register_size(data_type) 326 bit_count = i['bit_count'] 327 bit_offset = i['bit_offset'] 328 329 yield _DataInfo( 330 data_type=data_type, 331 register_size=register_size, 332 start_address=i['start_address'], 333 bit_count=bit_count, 334 bit_offset=bit_offset, 335 quantity=math.ceil((bit_count + bit_offset) / register_size), 336 interval=i['interval'], 337 name=i['name']) 338 339 340def _group_data_infos(data_infos): 341 type_interval_infos_dict = collections.defaultdict( 342 functools.partial(collections.defaultdict, collections.deque)) 343 344 for data_info in data_infos: 345 data_type = data_info.data_type 346 interval = data_info.interval 347 348 if interval is None: 349 continue 350 351 type_interval_infos_dict[data_type][interval].append(data_info) 352 353 for data_type, interval_infos_dict in type_interval_infos_dict.items(): 354 for interval, data_infos_queue in interval_infos_dict.items(): 355 yield from _group_data_infos_with_type_interval( 356 data_infos_queue, data_type, interval) 357 358 359def _group_data_infos_with_type_interval(data_infos, data_type, interval): 360 data_infos_queue = sorted(data_infos, 361 key=lambda i: (i.start_address, i.quantity)) 362 data_infos_queue = collections.deque(data_infos_queue) 363 364 while data_infos_queue: 365 max_quantity = _get_max_quantity(data_type) 366 start_address = None 367 quantity = None 368 data_infos = collections.deque() 369 370 while data_infos_queue: 371 data_info = data_infos_queue.popleft() 372 373 if start_address is None: 374 start_address = data_info.start_address 375 376 if quantity is None: 377 quantity = data_info.quantity 378 379 elif data_info.start_address > start_address + quantity: 380 data_infos_queue.appendleft(data_info) 381 break 382 383 else: 384 new_quantity = (data_info.quantity + 385 data_info.start_address - 386 start_address) 387 388 if new_quantity > max_quantity: 389 data_infos_queue.appendleft(data_info) 390 break 391 392 if new_quantity > quantity: 393 quantity = new_quantity 394 395 data_infos.append(data_info) 396 397 if start_address is None or quantity is None: 398 continue 399 400 yield _DataGroup(data_infos=data_infos, 401 interval=interval, 402 data_type=data_type, 403 start_address=start_address, 404 quantity=quantity) 405 406 407def _get_register_size(data_type): 408 if data_type in (DataType.COIL, 409 DataType.DISCRETE_INPUT): 410 return 1 411 412 if data_type in (DataType.HOLDING_REGISTER, 413 DataType.INPUT_REGISTER, 414 DataType.QUEUE): 415 return 16 416 417 raise ValueError('invalid data type') 418 419 420def _get_max_quantity(data_type): 421 if data_type in (DataType.COIL, 422 DataType.DISCRETE_INPUT): 423 return 2000 424 425 if data_type in (DataType.HOLDING_REGISTER, 426 DataType.INPUT_REGISTER): 427 return 125 428 429 if data_type == DataType.QUEUE: 430 return 1 431 432 raise ValueError('invalid data type') 433 434 435def _get_registers_value(register_size, bit_offset, bit_count, values): 436 result = 0 437 bits = itertools.chain(_get_registers_bits(register_size, values), 438 itertools.repeat(0)) 439 for i in itertools.islice(bits, bit_offset, bit_offset + bit_count): 440 result = (result << 1) | i 441 return result 442 443 444def _get_registers_bits(register_size, values): 445 for value in values: 446 for i in range(register_size): 447 yield (value >> (register_size - i - 1)) & 1
            mlog        =
<Logger hat.gateway.devices.modbus.master.remote_device (WARNING)>
        
    
    
    
    
                
            ResponseCb: hat.drivers.modbus.transport.common.ErrorRes | hat.drivers.modbus.transport.common.ReadCoilsRes | hat.drivers.modbus.transport.common.ReadDiscreteInputsRes | hat.drivers.modbus.transport.common.ReadHoldingRegistersRes | hat.drivers.modbus.transport.common.ReadInputRegistersRes | hat.drivers.modbus.transport.common.WriteSingleCoilRes | hat.drivers.modbus.transport.common.WriteSingleRegisterRes | hat.drivers.modbus.transport.common.WriteMultipleCoilsRes | hat.drivers.modbus.transport.common.WriteMultipleRegistersRes | hat.drivers.modbus.transport.common.MaskWriteRegisterRes | hat.drivers.modbus.transport.common.ReadFifoQueueRes        =
            typing.Callable[[hat.gateway.devices.modbus.master.eventer_client.StatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes], None | collections.abc.Awaitable[None]]
        
    
    
    
    
                
            
    class
    RemoteDevice:
                
    
    
            58class RemoteDevice: 59 60 def __init__(self, 61 conf: json.Data, 62 conn: Connection, 63 name: str): 64 self._conn = conn 65 self._name = name 66 self._device_id = conf['device_id'] 67 self._timeout_poll_delay = conf['timeout_poll_delay'] 68 self._data_infos = {data_info.name: data_info 69 for data_info in _get_data_infos(conf)} 70 self._data_groups = list(_group_data_infos(self._data_infos.values())) 71 self._log = common.create_remote_device_logger_adapter(mlog, name, 72 self._device_id) 73 74 @property 75 def conn(self) -> Connection: 76 return self._conn 77 78 @property 79 def device_id(self) -> int: 80 return self._device_id 81 82 def create_reader(self, response_cb: ResponseCb) -> aio.Resource: 83 return _Reader(conn=self._conn, 84 name=self._name, 85 device_id=self._device_id, 86 timeout_poll_delay=self._timeout_poll_delay, 87 data_groups=self._data_groups, 88 response_cb=response_cb) 89 90 async def write(self, 91 data_name: str, 92 request_id: str, 93 value: int 94 ) -> RemoteDeviceWriteRes | None: 95 data_info = self._data_infos.get(data_name) 96 if not data_info: 97 self._log.debug('data %s is not available', data_name) 98 return 99 100 if data_info.data_type == DataType.COIL: 101 result = await self._write_coil(data_info, value) 102 103 elif data_info.data_type == DataType.HOLDING_REGISTER: 104 result = await self._write_holding_register(data_info, value) 105 106 else: 107 self._log.debug('write unsupported for %s', data_info.data_type) 108 return 109 110 return RemoteDeviceWriteRes( 111 device_id=self._device_id, 112 data_name=data_name, 113 request_id=request_id, 114 result=result.name if result else 'SUCCESS') 115 116 async def _write_coil(self, data_info, value): 117 address = data_info.start_address + data_info.bit_offset 118 registers = [(value >> (data_info.bit_count - i - 1)) & 1 119 for i in range(data_info.bit_count)] 120 return await self._conn.write(device_id=self._device_id, 121 data_type=data_info.data_type, 122 start_address=address, 123 values=registers) 124 125 async def _write_holding_register(self, data_info, value): 126 address = data_info.start_address + (data_info.bit_offset // 16) 127 bit_count = data_info.bit_count 128 bit_offset = data_info.bit_offset % 16 129 130 if bit_offset: 131 mask_prefix_size = bit_offset 132 mask_suffix_size = max(16 - bit_offset - bit_count, 0) 133 mask_size = 16 - mask_prefix_size - mask_suffix_size 134 and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) | 135 ((0xFFFF << mask_suffix_size) >> 16)) 136 or_mask = (((value >> (bit_count - mask_size)) & 137 ((1 << mask_size) - 1)) << 138 mask_suffix_size) 139 result = await self._conn.write_mask(device_id=self._device_id, 140 address=address, 141 and_mask=and_mask, 142 or_mask=or_mask) 143 if result: 144 return result 145 address += 1 146 bit_count -= mask_size 147 148 register_count = bit_count // 16 149 if register_count: 150 registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF 151 for i in range(register_count)] 152 result = await self._conn.write(device_id=self._device_id, 153 data_type=data_info.data_type, 154 start_address=address, 155 values=registers) 156 if result: 157 return result 158 address += register_count 159 bit_count -= 16 * register_count 160 161 if not bit_count: 162 return 163 164 and_mask = (0xFFFF << (16 - bit_count)) >> 16 165 or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count) 166 return await self._conn.write_mask(device_id=self._device_id, 167 address=address, 168 and_mask=and_mask, 169 or_mask=or_mask)
            
        RemoteDevice(	conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]],	conn: hat.gateway.devices.modbus.master.connection.Connection,	name: str)
                
    
    
            60 def __init__(self, 61 conf: json.Data, 62 conn: Connection, 63 name: str): 64 self._conn = conn 65 self._name = name 66 self._device_id = conf['device_id'] 67 self._timeout_poll_delay = conf['timeout_poll_delay'] 68 self._data_infos = {data_info.name: data_info 69 for data_info in _get_data_infos(conf)} 70 self._data_groups = list(_group_data_infos(self._data_infos.values())) 71 self._log = common.create_remote_device_logger_adapter(mlog, name, 72 self._device_id)
            
        def
        create_reader(	self,	response_cb: Callable[[hat.gateway.devices.modbus.master.eventer_client.StatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes], None | Awaitable[None]]) -> hat.aio.group.Resource:
                
    
    
            
    
                            
            
        async def
        write(	self,	data_name: str,	request_id: str,	value: int) -> hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes | None:
                
    
    
            90 async def write(self, 91 data_name: str, 92 request_id: str, 93 value: int 94 ) -> RemoteDeviceWriteRes | None: 95 data_info = self._data_infos.get(data_name) 96 if not data_info: 97 self._log.debug('data %s is not available', data_name) 98 return 99 100 if data_info.data_type == DataType.COIL: 101 result = await self._write_coil(data_info, value) 102 103 elif data_info.data_type == DataType.HOLDING_REGISTER: 104 result = await self._write_holding_register(data_info, value) 105 106 else: 107 self._log.debug('write unsupported for %s', data_info.data_type) 108 return 109 110 return RemoteDeviceWriteRes( 111 device_id=self._device_id, 112 data_name=data_name, 113 request_id=request_id, 114 result=result.name if result else 'SUCCESS')