hat.gateway.devices.modbus.master.remote_device

  1import asyncio
  2import collections
  3import contextlib
  4import enum
  5import functools
  6import itertools
  7import logging
  8import math
  9import time
 10import typing
 11
 12from hat import aio
 13from hat import json
 14
 15from hat.gateway.devices.modbus.master import common
 16from hat.gateway.devices.modbus.master.connection import (DataType,
 17                                                          Error,
 18                                                          Connection)
 19from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceStatusRes,  # NOQA
 20                                                              RemoteDeviceReadRes,  # NOQA
 21                                                              RemoteDeviceWriteRes,  # NOQA
 22                                                              Response)
 23
 24
 25mlog = logging.getLogger(__name__)
 26
 27
 28ResponseCb: typing.TypeAlias = aio.AsyncCallable[[Response], None]
 29
 30
 31class _Status(enum.Enum):
 32    DISABLED = 'DISABLED'
 33    CONNECTING = 'CONNECTING'
 34    CONNECTED = 'CONNECTED'
 35    DISCONNECTED = 'DISCONNECTED'
 36
 37
 38class _DataInfo(typing.NamedTuple):
 39    data_type: DataType
 40    register_size: int
 41    start_address: int
 42    bit_count: int
 43    bit_offset: int
 44    quantity: int
 45    interval: float | None
 46    name: str
 47
 48
 49class _DataGroup(typing.NamedTuple):
 50    data_infos: list[_DataInfo]
 51    interval: float
 52    data_type: DataType
 53    start_address: int
 54    quantity: int
 55
 56
 57class RemoteDevice:
 58
 59    def __init__(self,
 60                 conf: json.Data,
 61                 conn: Connection,
 62                 name: str):
 63        self._conn = conn
 64        self._name = name
 65        self._device_id = conf['device_id']
 66        self._timeout_poll_delay = conf['timeout_poll_delay']
 67        self._data_infos = {data_info.name: data_info
 68                            for data_info in _get_data_infos(conf)}
 69        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 70        self._log = common.create_remote_device_logger_adapter(mlog, name,
 71                                                               self._device_id)
 72
 73    @property
 74    def conn(self) -> Connection:
 75        return self._conn
 76
 77    @property
 78    def device_id(self) -> int:
 79        return self._device_id
 80
 81    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 82        return _Reader(conn=self._conn,
 83                       name=self._name,
 84                       device_id=self._device_id,
 85                       timeout_poll_delay=self._timeout_poll_delay,
 86                       data_groups=self._data_groups,
 87                       response_cb=response_cb)
 88
 89    async def write(self,
 90                    data_name: str,
 91                    request_id: str,
 92                    value: int
 93                    ) -> RemoteDeviceWriteRes | None:
 94        data_info = self._data_infos.get(data_name)
 95        if not data_info:
 96            self._log.debug('data %s is not available', data_name)
 97            return
 98
 99        if data_info.data_type == DataType.COIL:
100            result = await self._write_coil(data_info, value)
101
102        elif data_info.data_type == DataType.HOLDING_REGISTER:
103            result = await self._write_holding_register(data_info, value)
104
105        else:
106            self._log.debug('write unsupported for %s', data_info.data_type)
107            return
108
109        return RemoteDeviceWriteRes(
110            device_id=self._device_id,
111            data_name=data_name,
112            request_id=request_id,
113            result=result.name if result else 'SUCCESS')
114
115    async def _write_coil(self, data_info, value):
116        address = data_info.start_address + data_info.bit_offset
117        registers = [(value >> (data_info.bit_count - i - 1)) & 1
118                     for i in range(data_info.bit_count)]
119        return await self._conn.write(device_id=self._device_id,
120                                      data_type=data_info.data_type,
121                                      start_address=address,
122                                      values=registers)
123
124    async def _write_holding_register(self, data_info, value):
125        address = data_info.start_address + (data_info.bit_offset // 16)
126        bit_count = data_info.bit_count
127        bit_offset = data_info.bit_offset % 16
128
129        if bit_offset:
130            mask_prefix_size = bit_offset
131            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
132            mask_size = 16 - mask_prefix_size - mask_suffix_size
133            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
134                        ((0xFFFF << mask_suffix_size) >> 16))
135            or_mask = (((value >> (bit_count - mask_size)) &
136                        ((1 << mask_size) - 1)) <<
137                       mask_suffix_size)
138            result = await self._conn.write_mask(device_id=self._device_id,
139                                                 address=address,
140                                                 and_mask=and_mask,
141                                                 or_mask=or_mask)
142            if result:
143                return result
144            address += 1
145            bit_count -= mask_size
146
147        register_count = bit_count // 16
148        if register_count:
149            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
150                         for i in range(register_count)]
151            result = await self._conn.write(device_id=self._device_id,
152                                            data_type=data_info.data_type,
153                                            start_address=address,
154                                            values=registers)
155            if result:
156                return result
157            address += register_count
158            bit_count -= 16 * register_count
159
160        if not bit_count:
161            return
162
163        and_mask = (0xFFFF << (16 - bit_count)) >> 16
164        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
165        return await self._conn.write_mask(device_id=self._device_id,
166                                           address=address,
167                                           and_mask=and_mask,
168                                           or_mask=or_mask)
169
170
171class _Reader(aio.Resource):
172
173    def __init__(self, conn, name, device_id, timeout_poll_delay, data_groups,
174                 response_cb):
175        self._conn = conn
176        self._device_id = device_id
177        self._timeout_poll_delay = timeout_poll_delay
178        self._response_cb = response_cb
179        self._status = None
180        self._async_group = conn.async_group.create_subgroup()
181        self._log = common.create_remote_device_logger_adapter(mlog, name,
182                                                               device_id)
183
184        self.async_group.spawn(self._read_loop, data_groups)
185
186    @property
187    def async_group(self) -> aio.Group:
188        return self._async_group
189
190    async def _read_loop(self, data_groups):
191        try:
192            self._log.debug('starting read loop')
193            loop = asyncio.get_running_loop()
194
195            if not data_groups:
196                await self._set_status(_Status.CONNECTED)
197                await loop.create_future()
198
199            last_read_times = [None for _ in data_groups]
200            last_responses = {}
201            await self._set_status(_Status.CONNECTING)
202
203            while True:
204                now = time.monotonic()
205                sleep_dt = None
206                read_data_groups = collections.deque()
207
208                for i, data_group in enumerate(data_groups):
209                    last_read_time = last_read_times[i]
210                    if last_read_time is not None:
211                        dt = data_group.interval - now + last_read_time
212                        if dt > 0:
213                            if sleep_dt is None or dt < sleep_dt:
214                                sleep_dt = dt
215                            continue
216
217                    read_data_groups.append(data_group)
218                    last_read_times[i] = now
219
220                if not read_data_groups:
221                    await asyncio.sleep(sleep_dt)
222                    continue
223
224                timeout = False
225
226                self._log.debug('reading data')
227                for data_group in read_data_groups:
228                    result = await self._conn.read(
229                        device_id=self._device_id,
230                        data_type=data_group.data_type,
231                        start_address=data_group.start_address,
232                        quantity=data_group.quantity)
233
234                    if isinstance(result, Error) and result.name == 'TIMEOUT':
235                        timeout = True
236                        break
237
238                    await self._set_status(_Status.CONNECTED)
239
240                    for data_info in data_group.data_infos:
241                        last_response = last_responses.get(data_info.name)
242
243                        response = self._process_read_result(
244                            data_info, data_group.start_address,
245                            result, last_response)
246
247                        if response:
248                            last_responses[data_info.name] = response
249                            await aio.call(self._response_cb, response)
250
251                if timeout:
252                    await self._set_status(_Status.DISCONNECTED)
253                    await asyncio.sleep(self._timeout_poll_delay)
254
255                    last_read_times = [None for _ in data_groups]
256                    last_responses = {}
257                    await self._set_status(_Status.CONNECTING)
258
259        except ConnectionError:
260            self._log.debug('connection closed')
261
262        except Exception as e:
263            self._log.error('read loop error: %s', e, exc_info=e)
264
265        finally:
266            self._log.debug('closing read loop')
267            self.close()
268
269            with contextlib.suppress(Exception):
270                await aio.uncancellable(self._set_status(_Status.DISABLED))
271
272    def _process_read_result(self, data_info, start_address, result,
273                             last_response):
274        if isinstance(result, Error):
275            self._log.debug('data name %s: error response %s',
276                            data_info.name, result)
277            return RemoteDeviceReadRes(device_id=self._device_id,
278                                       data_name=data_info.name,
279                                       result=result.name,
280                                       value=None,
281                                       cause=None)
282
283        offset = data_info.start_address - start_address
284        value = _get_registers_value(
285            data_info.register_size, data_info.bit_offset,
286            data_info.bit_count,
287            result[offset:offset+data_info.quantity])
288
289        if last_response is None or last_response.result != 'SUCCESS':
290            self._log.debug('data name %s: initial value %s',
291                            data_info.name, value)
292            return RemoteDeviceReadRes(device_id=self._device_id,
293                                       data_name=data_info.name,
294                                       result='SUCCESS',
295                                       value=value,
296                                       cause='INTERROGATE')
297
298        if last_response.value != value:
299            self._log.debug('data name %s: value change %s -> %s',
300                            data_info.name, last_response.value, value)
301            return RemoteDeviceReadRes(device_id=self._device_id,
302                                       data_name=data_info.name,
303                                       result='SUCCESS',
304                                       value=value,
305                                       cause='CHANGE')
306
307        self._log.debug('data name %s: no value change', data_info.name)
308
309    async def _set_status(self, status):
310        if self._status == status:
311            return
312
313        self._log.debug('changing remote device status %s -> %s',
314                        self._status, status)
315        self._status = status
316
317        res = RemoteDeviceStatusRes(device_id=self._device_id,
318                                    status=status.name)
319        await aio.call(self._response_cb, res)
320
321
322def _get_data_infos(conf):
323    for i in conf['data']:
324        data_type = DataType[i['data_type']]
325        register_size = _get_register_size(data_type)
326        bit_count = i['bit_count']
327        bit_offset = i['bit_offset']
328
329        yield _DataInfo(
330            data_type=data_type,
331            register_size=register_size,
332            start_address=i['start_address'],
333            bit_count=bit_count,
334            bit_offset=bit_offset,
335            quantity=math.ceil((bit_count + bit_offset) / register_size),
336            interval=i['interval'],
337            name=i['name'])
338
339
340def _group_data_infos(data_infos):
341    type_interval_infos_dict = collections.defaultdict(
342        functools.partial(collections.defaultdict, collections.deque))
343
344    for data_info in data_infos:
345        data_type = data_info.data_type
346        interval = data_info.interval
347
348        if interval is None:
349            continue
350
351        type_interval_infos_dict[data_type][interval].append(data_info)
352
353    for data_type, interval_infos_dict in type_interval_infos_dict.items():
354        for interval, data_infos_queue in interval_infos_dict.items():
355            yield from _group_data_infos_with_type_interval(
356                data_infos_queue, data_type, interval)
357
358
359def _group_data_infos_with_type_interval(data_infos, data_type, interval):
360    data_infos_queue = sorted(data_infos,
361                              key=lambda i: (i.start_address, i.quantity))
362    data_infos_queue = collections.deque(data_infos_queue)
363
364    while data_infos_queue:
365        max_quantity = _get_max_quantity(data_type)
366        start_address = None
367        quantity = None
368        data_infos = collections.deque()
369
370        while data_infos_queue:
371            data_info = data_infos_queue.popleft()
372
373            if start_address is None:
374                start_address = data_info.start_address
375
376            if quantity is None:
377                quantity = data_info.quantity
378
379            elif data_info.start_address > start_address + quantity:
380                data_infos_queue.appendleft(data_info)
381                break
382
383            else:
384                new_quantity = (data_info.quantity +
385                                data_info.start_address -
386                                start_address)
387
388                if new_quantity > max_quantity:
389                    data_infos_queue.appendleft(data_info)
390                    break
391
392                if new_quantity > quantity:
393                    quantity = new_quantity
394
395            data_infos.append(data_info)
396
397        if start_address is None or quantity is None:
398            continue
399
400        yield _DataGroup(data_infos=data_infos,
401                         interval=interval,
402                         data_type=data_type,
403                         start_address=start_address,
404                         quantity=quantity)
405
406
407def _get_register_size(data_type):
408    if data_type in (DataType.COIL,
409                     DataType.DISCRETE_INPUT):
410        return 1
411
412    if data_type in (DataType.HOLDING_REGISTER,
413                     DataType.INPUT_REGISTER,
414                     DataType.QUEUE):
415        return 16
416
417    raise ValueError('invalid data type')
418
419
420def _get_max_quantity(data_type):
421    if data_type in (DataType.COIL,
422                     DataType.DISCRETE_INPUT):
423        return 2000
424
425    if data_type in (DataType.HOLDING_REGISTER,
426                     DataType.INPUT_REGISTER):
427        return 125
428
429    if data_type == DataType.QUEUE:
430        return 1
431
432    raise ValueError('invalid data type')
433
434
435def _get_registers_value(register_size, bit_offset, bit_count, values):
436    result = 0
437    bits = itertools.chain(_get_registers_bits(register_size, values),
438                           itertools.repeat(0))
439    for i in itertools.islice(bits, bit_offset, bit_offset + bit_count):
440        result = (result << 1) | i
441    return result
442
443
444def _get_registers_bits(register_size, values):
445    for value in values:
446        for i in range(register_size):
447            yield (value >> (register_size - i - 1)) & 1
ResponseCb: hat.drivers.modbus.transport.common.ErrorRes | hat.drivers.modbus.transport.common.ReadCoilsRes | hat.drivers.modbus.transport.common.ReadDiscreteInputsRes | hat.drivers.modbus.transport.common.ReadHoldingRegistersRes | hat.drivers.modbus.transport.common.ReadInputRegistersRes | hat.drivers.modbus.transport.common.WriteSingleCoilRes | hat.drivers.modbus.transport.common.WriteSingleRegisterRes | hat.drivers.modbus.transport.common.WriteMultipleCoilsRes | hat.drivers.modbus.transport.common.WriteMultipleRegistersRes | hat.drivers.modbus.transport.common.MaskWriteRegisterRes | hat.drivers.modbus.transport.common.ReadFifoQueueRes = typing.Callable[[hat.gateway.devices.modbus.master.eventer_client.StatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes], None | collections.abc.Awaitable[None]]
class RemoteDevice:
 58class RemoteDevice:
 59
 60    def __init__(self,
 61                 conf: json.Data,
 62                 conn: Connection,
 63                 name: str):
 64        self._conn = conn
 65        self._name = name
 66        self._device_id = conf['device_id']
 67        self._timeout_poll_delay = conf['timeout_poll_delay']
 68        self._data_infos = {data_info.name: data_info
 69                            for data_info in _get_data_infos(conf)}
 70        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 71        self._log = common.create_remote_device_logger_adapter(mlog, name,
 72                                                               self._device_id)
 73
 74    @property
 75    def conn(self) -> Connection:
 76        return self._conn
 77
 78    @property
 79    def device_id(self) -> int:
 80        return self._device_id
 81
 82    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 83        return _Reader(conn=self._conn,
 84                       name=self._name,
 85                       device_id=self._device_id,
 86                       timeout_poll_delay=self._timeout_poll_delay,
 87                       data_groups=self._data_groups,
 88                       response_cb=response_cb)
 89
 90    async def write(self,
 91                    data_name: str,
 92                    request_id: str,
 93                    value: int
 94                    ) -> RemoteDeviceWriteRes | None:
 95        data_info = self._data_infos.get(data_name)
 96        if not data_info:
 97            self._log.debug('data %s is not available', data_name)
 98            return
 99
100        if data_info.data_type == DataType.COIL:
101            result = await self._write_coil(data_info, value)
102
103        elif data_info.data_type == DataType.HOLDING_REGISTER:
104            result = await self._write_holding_register(data_info, value)
105
106        else:
107            self._log.debug('write unsupported for %s', data_info.data_type)
108            return
109
110        return RemoteDeviceWriteRes(
111            device_id=self._device_id,
112            data_name=data_name,
113            request_id=request_id,
114            result=result.name if result else 'SUCCESS')
115
116    async def _write_coil(self, data_info, value):
117        address = data_info.start_address + data_info.bit_offset
118        registers = [(value >> (data_info.bit_count - i - 1)) & 1
119                     for i in range(data_info.bit_count)]
120        return await self._conn.write(device_id=self._device_id,
121                                      data_type=data_info.data_type,
122                                      start_address=address,
123                                      values=registers)
124
125    async def _write_holding_register(self, data_info, value):
126        address = data_info.start_address + (data_info.bit_offset // 16)
127        bit_count = data_info.bit_count
128        bit_offset = data_info.bit_offset % 16
129
130        if bit_offset:
131            mask_prefix_size = bit_offset
132            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
133            mask_size = 16 - mask_prefix_size - mask_suffix_size
134            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
135                        ((0xFFFF << mask_suffix_size) >> 16))
136            or_mask = (((value >> (bit_count - mask_size)) &
137                        ((1 << mask_size) - 1)) <<
138                       mask_suffix_size)
139            result = await self._conn.write_mask(device_id=self._device_id,
140                                                 address=address,
141                                                 and_mask=and_mask,
142                                                 or_mask=or_mask)
143            if result:
144                return result
145            address += 1
146            bit_count -= mask_size
147
148        register_count = bit_count // 16
149        if register_count:
150            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
151                         for i in range(register_count)]
152            result = await self._conn.write(device_id=self._device_id,
153                                            data_type=data_info.data_type,
154                                            start_address=address,
155                                            values=registers)
156            if result:
157                return result
158            address += register_count
159            bit_count -= 16 * register_count
160
161        if not bit_count:
162            return
163
164        and_mask = (0xFFFF << (16 - bit_count)) >> 16
165        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
166        return await self._conn.write_mask(device_id=self._device_id,
167                                           address=address,
168                                           and_mask=and_mask,
169                                           or_mask=or_mask)
RemoteDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], conn: hat.gateway.devices.modbus.master.connection.Connection, name: str)
60    def __init__(self,
61                 conf: json.Data,
62                 conn: Connection,
63                 name: str):
64        self._conn = conn
65        self._name = name
66        self._device_id = conf['device_id']
67        self._timeout_poll_delay = conf['timeout_poll_delay']
68        self._data_infos = {data_info.name: data_info
69                            for data_info in _get_data_infos(conf)}
70        self._data_groups = list(_group_data_infos(self._data_infos.values()))
71        self._log = common.create_remote_device_logger_adapter(mlog, name,
72                                                               self._device_id)
74    @property
75    def conn(self) -> Connection:
76        return self._conn
device_id: int
78    @property
79    def device_id(self) -> int:
80        return self._device_id
82    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
83        return _Reader(conn=self._conn,
84                       name=self._name,
85                       device_id=self._device_id,
86                       timeout_poll_delay=self._timeout_poll_delay,
87                       data_groups=self._data_groups,
88                       response_cb=response_cb)
async def write( self, data_name: str, request_id: str, value: int) -> hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes | None:
 90    async def write(self,
 91                    data_name: str,
 92                    request_id: str,
 93                    value: int
 94                    ) -> RemoteDeviceWriteRes | None:
 95        data_info = self._data_infos.get(data_name)
 96        if not data_info:
 97            self._log.debug('data %s is not available', data_name)
 98            return
 99
100        if data_info.data_type == DataType.COIL:
101            result = await self._write_coil(data_info, value)
102
103        elif data_info.data_type == DataType.HOLDING_REGISTER:
104            result = await self._write_holding_register(data_info, value)
105
106        else:
107            self._log.debug('write unsupported for %s', data_info.data_type)
108            return
109
110        return RemoteDeviceWriteRes(
111            device_id=self._device_id,
112            data_name=data_name,
113            request_id=request_id,
114            result=result.name if result else 'SUCCESS')