hat.gateway.devices.modbus.master.remote_device

  1import asyncio
  2import collections
  3import contextlib
  4import enum
  5import functools
  6import itertools
  7import logging
  8import math
  9import time
 10import typing
 11
 12from hat import aio
 13from hat import json
 14
 15from hat.gateway.devices.modbus.master.connection import (DataType,
 16                                                          Error,
 17                                                          Connection)
 18from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceStatusRes,  # NOQA
 19                                                              RemoteDeviceReadRes,  # NOQA
 20                                                              RemoteDeviceWriteRes,  # NOQA
 21                                                              Response)
 22
 23
 24mlog = logging.getLogger(__name__)
 25
 26
 27ResponseCb: typing.TypeAlias = aio.AsyncCallable[[Response], None]
 28
 29
 30class _Status(enum.Enum):
 31    DISABLED = 'DISABLED'
 32    CONNECTING = 'CONNECTING'
 33    CONNECTED = 'CONNECTED'
 34    DISCONNECTED = 'DISCONNECTED'
 35
 36
 37class _DataInfo(typing.NamedTuple):
 38    data_type: DataType
 39    register_size: int
 40    start_address: int
 41    bit_count: int
 42    bit_offset: int
 43    quantity: int
 44    interval: float | None
 45    name: str
 46
 47
 48class _DataGroup(typing.NamedTuple):
 49    data_infos: list[_DataInfo]
 50    interval: float
 51    data_type: DataType
 52    start_address: int
 53    quantity: int
 54
 55
 56class RemoteDevice:
 57
 58    def __init__(self,
 59                 conf: json.Data,
 60                 conn: Connection,
 61                 log_prefix: str):
 62        self._conn = conn
 63        self._device_id = conf['device_id']
 64        self._timeout_poll_delay = conf['timeout_poll_delay']
 65        self._log_prefix = f"{log_prefix}: remote device id {self._device_id}"
 66        self._data_infos = {data_info.name: data_info
 67                            for data_info in _get_data_infos(conf)}
 68        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 69
 70    @property
 71    def conn(self) -> Connection:
 72        return self._conn
 73
 74    @property
 75    def device_id(self) -> int:
 76        return self._device_id
 77
 78    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 79        return _Reader(conn=self._conn,
 80                       device_id=self._device_id,
 81                       timeout_poll_delay=self._timeout_poll_delay,
 82                       data_groups=self._data_groups,
 83                       response_cb=response_cb,
 84                       log_prefix=self._log_prefix)
 85
 86    async def write(self,
 87                    data_name: str,
 88                    request_id: str,
 89                    value: int
 90                    ) -> RemoteDeviceWriteRes | None:
 91        data_info = self._data_infos.get(data_name)
 92        if not data_info:
 93            self._log(logging.DEBUG, 'data %s is not available', data_name)
 94            return
 95
 96        if data_info.data_type == DataType.COIL:
 97            result = await self._write_coil(data_info, value)
 98
 99        elif data_info.data_type == DataType.HOLDING_REGISTER:
100            result = await self._write_holding_register(data_info, value)
101
102        else:
103            self._log(logging.DEBUG, 'write unsupported for %s',
104                      data_info.data_type)
105            return
106
107        return RemoteDeviceWriteRes(
108            device_id=self._device_id,
109            data_name=data_name,
110            request_id=request_id,
111            result=result.name if result else 'SUCCESS')
112
113    async def _write_coil(self, data_info, value):
114        address = data_info.start_address + data_info.bit_offset
115        registers = [(value >> (data_info.bit_count - i - 1)) & 1
116                     for i in range(data_info.bit_count)]
117        return await self._conn.write(device_id=self._device_id,
118                                      data_type=data_info.data_type,
119                                      start_address=address,
120                                      values=registers)
121
122    async def _write_holding_register(self, data_info, value):
123        address = data_info.start_address + (data_info.bit_offset // 16)
124        bit_count = data_info.bit_count
125        bit_offset = data_info.bit_offset % 16
126
127        if bit_offset:
128            mask_prefix_size = bit_offset
129            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
130            mask_size = 16 - mask_prefix_size - mask_suffix_size
131            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
132                        ((0xFFFF << mask_suffix_size) >> 16))
133            or_mask = (((value >> (bit_count - mask_size)) &
134                        ((1 << mask_size) - 1)) <<
135                       mask_suffix_size)
136            result = await self._conn.write_mask(device_id=self._device_id,
137                                                 address=address,
138                                                 and_mask=and_mask,
139                                                 or_mask=or_mask)
140            if result:
141                return result
142            address += 1
143            bit_count -= mask_size
144
145        register_count = bit_count // 16
146        if register_count:
147            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
148                         for i in range(register_count)]
149            result = await self._conn.write(device_id=self._device_id,
150                                            data_type=data_info.data_type,
151                                            start_address=address,
152                                            values=registers)
153            if result:
154                return result
155            address += register_count
156            bit_count -= 16 * register_count
157
158        if not bit_count:
159            return
160
161        and_mask = (0xFFFF << (16 - bit_count)) >> 16
162        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
163        return await self._conn.write_mask(device_id=self._device_id,
164                                           address=address,
165                                           and_mask=and_mask,
166                                           or_mask=or_mask)
167
168    def _log(self, level, msg, *args, **kwargs):
169        if not mlog.isEnabledFor(level):
170            return
171
172        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
173
174
175class _Reader(aio.Resource):
176
177    def __init__(self, conn, device_id, timeout_poll_delay, data_groups,
178                 response_cb, log_prefix):
179        self._conn = conn
180        self._device_id = device_id
181        self._timeout_poll_delay = timeout_poll_delay
182        self._response_cb = response_cb
183        self._log_prefix = log_prefix
184        self._status = None
185        self._async_group = conn.async_group.create_subgroup()
186
187        self.async_group.spawn(self._read_loop, data_groups)
188
189    @property
190    def async_group(self) -> aio.Group:
191        return self._async_group
192
193    async def _read_loop(self, data_groups):
194        try:
195            self._log(logging.DEBUG, 'starting read loop')
196            loop = asyncio.get_running_loop()
197
198            if not data_groups:
199                await self._set_status(_Status.CONNECTED)
200                await loop.create_future()
201
202            last_read_times = [None for _ in data_groups]
203            last_responses = {}
204            await self._set_status(_Status.CONNECTING)
205
206            while True:
207                now = time.monotonic()
208                sleep_dt = None
209                read_data_groups = collections.deque()
210
211                for i, data_group in enumerate(data_groups):
212                    last_read_time = last_read_times[i]
213                    if last_read_time is not None:
214                        dt = data_group.interval - now + last_read_time
215                        if dt > 0:
216                            if sleep_dt is None or dt < sleep_dt:
217                                sleep_dt = dt
218                            continue
219
220                    read_data_groups.append(data_group)
221                    last_read_times[i] = now
222
223                if not read_data_groups:
224                    await asyncio.sleep(sleep_dt)
225                    continue
226
227                timeout = False
228
229                self._log(logging.DEBUG, 'reading data')
230                for data_group in read_data_groups:
231                    result = await self._conn.read(
232                        device_id=self._device_id,
233                        data_type=data_group.data_type,
234                        start_address=data_group.start_address,
235                        quantity=data_group.quantity)
236
237                    if isinstance(result, Error) and result.name == 'TIMEOUT':
238                        timeout = True
239                        break
240
241                    await self._set_status(_Status.CONNECTED)
242
243                    for data_info in data_group.data_infos:
244                        last_response = last_responses.get(data_info.name)
245
246                        response = self._process_read_result(
247                            data_info, data_group.start_address,
248                            result, last_response)
249
250                        if response:
251                            last_responses[data_info.name] = response
252                            await aio.call(self._response_cb, response)
253
254                if timeout:
255                    await self._set_status(_Status.DISCONNECTED)
256                    await asyncio.sleep(self._timeout_poll_delay)
257
258                    last_read_times = [None for _ in data_groups]
259                    last_responses = {}
260                    await self._set_status(_Status.CONNECTING)
261
262        except ConnectionError:
263            self._log(logging.DEBUG, 'connection closed')
264
265        except Exception as e:
266            self._log(logging.ERROR, 'read loop error: %s', e, exc_info=e)
267
268        finally:
269            self._log(logging.DEBUG, 'closing read loop')
270            self.close()
271
272            with contextlib.suppress(Exception):
273                await aio.uncancellable(self._set_status(_Status.DISABLED))
274
275    def _process_read_result(self, data_info, start_address, result,
276                             last_response):
277        if isinstance(result, Error):
278            self._log(logging.DEBUG, 'data name %s: error response %s',
279                      data_info.name, result)
280            return RemoteDeviceReadRes(device_id=self._device_id,
281                                       data_name=data_info.name,
282                                       result=result.name,
283                                       value=None,
284                                       cause=None)
285
286        offset = data_info.start_address - start_address
287        value = _get_registers_value(
288            data_info.register_size, data_info.bit_offset,
289            data_info.bit_count,
290            result[offset:offset+data_info.quantity])
291
292        if last_response is None or last_response.result != 'SUCCESS':
293            self._log(logging.DEBUG, 'data name %s: initial value %s',
294                      data_info.name, value)
295            return RemoteDeviceReadRes(device_id=self._device_id,
296                                       data_name=data_info.name,
297                                       result='SUCCESS',
298                                       value=value,
299                                       cause='INTERROGATE')
300
301        if last_response.value != value:
302            self._log(logging.DEBUG, 'data name %s: value change %s -> %s',
303                      data_info.name, last_response.value, value)
304            return RemoteDeviceReadRes(device_id=self._device_id,
305                                       data_name=data_info.name,
306                                       result='SUCCESS',
307                                       value=value,
308                                       cause='CHANGE')
309
310        self._log(logging.DEBUG, 'data name %s: no value change',
311                  data_info.name)
312
313    async def _set_status(self, status):
314        if self._status == status:
315            return
316
317        self._log(logging.DEBUG, 'changing remote device status %s -> %s',
318                  self._status, status)
319        self._status = status
320
321        res = RemoteDeviceStatusRes(device_id=self._device_id,
322                                    status=status.name)
323        await aio.call(self._response_cb, res)
324
325    def _log(self, level, msg, *args, **kwargs):
326        if not mlog.isEnabledFor(level):
327            return
328
329        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
330
331
332def _get_data_infos(conf):
333    for i in conf['data']:
334        data_type = DataType[i['data_type']]
335        register_size = _get_register_size(data_type)
336        bit_count = i['bit_count']
337        bit_offset = i['bit_offset']
338
339        yield _DataInfo(
340            data_type=data_type,
341            register_size=register_size,
342            start_address=i['start_address'],
343            bit_count=bit_count,
344            bit_offset=bit_offset,
345            quantity=math.ceil((bit_count + bit_offset) / register_size),
346            interval=i['interval'],
347            name=i['name'])
348
349
350def _group_data_infos(data_infos):
351    type_interval_infos_dict = collections.defaultdict(
352        functools.partial(collections.defaultdict, collections.deque))
353
354    for data_info in data_infos:
355        data_type = data_info.data_type
356        interval = data_info.interval
357
358        if interval is None:
359            continue
360
361        type_interval_infos_dict[data_type][interval].append(data_info)
362
363    for data_type, interval_infos_dict in type_interval_infos_dict.items():
364        for interval, data_infos_queue in interval_infos_dict.items():
365            yield from _group_data_infos_with_type_interval(
366                data_infos_queue, data_type, interval)
367
368
369def _group_data_infos_with_type_interval(data_infos, data_type, interval):
370    data_infos_queue = sorted(data_infos,
371                              key=lambda i: (i.start_address, i.quantity))
372    data_infos_queue = collections.deque(data_infos_queue)
373
374    while data_infos_queue:
375        max_quantity = _get_max_quantity(data_type)
376        start_address = None
377        quantity = None
378        data_infos = collections.deque()
379
380        while data_infos_queue:
381            data_info = data_infos_queue.popleft()
382
383            if start_address is None:
384                start_address = data_info.start_address
385
386            if quantity is None:
387                quantity = data_info.quantity
388
389            elif data_info.start_address > start_address + quantity:
390                data_infos_queue.appendleft(data_info)
391                break
392
393            else:
394                new_quantity = (data_info.quantity +
395                                data_info.start_address -
396                                start_address)
397
398                if new_quantity > max_quantity:
399                    data_infos_queue.appendleft(data_info)
400                    break
401
402                if new_quantity > quantity:
403                    quantity = new_quantity
404
405            data_infos.append(data_info)
406
407        if start_address is None or quantity is None:
408            continue
409
410        yield _DataGroup(data_infos=data_infos,
411                         interval=interval,
412                         data_type=data_type,
413                         start_address=start_address,
414                         quantity=quantity)
415
416
417def _get_register_size(data_type):
418    if data_type in (DataType.COIL,
419                     DataType.DISCRETE_INPUT):
420        return 1
421
422    if data_type in (DataType.HOLDING_REGISTER,
423                     DataType.INPUT_REGISTER,
424                     DataType.QUEUE):
425        return 16
426
427    raise ValueError('invalid data type')
428
429
430def _get_max_quantity(data_type):
431    if data_type in (DataType.COIL,
432                     DataType.DISCRETE_INPUT):
433        return 2000
434
435    if data_type in (DataType.HOLDING_REGISTER,
436                     DataType.INPUT_REGISTER):
437        return 125
438
439    if data_type == DataType.QUEUE:
440        return 1
441
442    raise ValueError('invalid data type')
443
444
445def _get_registers_value(register_size, bit_offset, bit_count, values):
446    result = 0
447    bits = itertools.chain(_get_registers_bits(register_size, values),
448                           itertools.repeat(0))
449    for i in itertools.islice(bits, bit_offset, bit_offset + bit_count):
450        result = (result << 1) | i
451    return result
452
453
454def _get_registers_bits(register_size, values):
455    for value in values:
456        for i in range(register_size):
457            yield (value >> (register_size - i - 1)) & 1
class RemoteDevice:
 57class RemoteDevice:
 58
 59    def __init__(self,
 60                 conf: json.Data,
 61                 conn: Connection,
 62                 log_prefix: str):
 63        self._conn = conn
 64        self._device_id = conf['device_id']
 65        self._timeout_poll_delay = conf['timeout_poll_delay']
 66        self._log_prefix = f"{log_prefix}: remote device id {self._device_id}"
 67        self._data_infos = {data_info.name: data_info
 68                            for data_info in _get_data_infos(conf)}
 69        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 70
 71    @property
 72    def conn(self) -> Connection:
 73        return self._conn
 74
 75    @property
 76    def device_id(self) -> int:
 77        return self._device_id
 78
 79    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 80        return _Reader(conn=self._conn,
 81                       device_id=self._device_id,
 82                       timeout_poll_delay=self._timeout_poll_delay,
 83                       data_groups=self._data_groups,
 84                       response_cb=response_cb,
 85                       log_prefix=self._log_prefix)
 86
 87    async def write(self,
 88                    data_name: str,
 89                    request_id: str,
 90                    value: int
 91                    ) -> RemoteDeviceWriteRes | None:
 92        data_info = self._data_infos.get(data_name)
 93        if not data_info:
 94            self._log(logging.DEBUG, 'data %s is not available', data_name)
 95            return
 96
 97        if data_info.data_type == DataType.COIL:
 98            result = await self._write_coil(data_info, value)
 99
100        elif data_info.data_type == DataType.HOLDING_REGISTER:
101            result = await self._write_holding_register(data_info, value)
102
103        else:
104            self._log(logging.DEBUG, 'write unsupported for %s',
105                      data_info.data_type)
106            return
107
108        return RemoteDeviceWriteRes(
109            device_id=self._device_id,
110            data_name=data_name,
111            request_id=request_id,
112            result=result.name if result else 'SUCCESS')
113
114    async def _write_coil(self, data_info, value):
115        address = data_info.start_address + data_info.bit_offset
116        registers = [(value >> (data_info.bit_count - i - 1)) & 1
117                     for i in range(data_info.bit_count)]
118        return await self._conn.write(device_id=self._device_id,
119                                      data_type=data_info.data_type,
120                                      start_address=address,
121                                      values=registers)
122
123    async def _write_holding_register(self, data_info, value):
124        address = data_info.start_address + (data_info.bit_offset // 16)
125        bit_count = data_info.bit_count
126        bit_offset = data_info.bit_offset % 16
127
128        if bit_offset:
129            mask_prefix_size = bit_offset
130            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
131            mask_size = 16 - mask_prefix_size - mask_suffix_size
132            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
133                        ((0xFFFF << mask_suffix_size) >> 16))
134            or_mask = (((value >> (bit_count - mask_size)) &
135                        ((1 << mask_size) - 1)) <<
136                       mask_suffix_size)
137            result = await self._conn.write_mask(device_id=self._device_id,
138                                                 address=address,
139                                                 and_mask=and_mask,
140                                                 or_mask=or_mask)
141            if result:
142                return result
143            address += 1
144            bit_count -= mask_size
145
146        register_count = bit_count // 16
147        if register_count:
148            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
149                         for i in range(register_count)]
150            result = await self._conn.write(device_id=self._device_id,
151                                            data_type=data_info.data_type,
152                                            start_address=address,
153                                            values=registers)
154            if result:
155                return result
156            address += register_count
157            bit_count -= 16 * register_count
158
159        if not bit_count:
160            return
161
162        and_mask = (0xFFFF << (16 - bit_count)) >> 16
163        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
164        return await self._conn.write_mask(device_id=self._device_id,
165                                           address=address,
166                                           and_mask=and_mask,
167                                           or_mask=or_mask)
168
169    def _log(self, level, msg, *args, **kwargs):
170        if not mlog.isEnabledFor(level):
171            return
172
173        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
RemoteDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], conn: hat.gateway.devices.modbus.master.connection.Connection, log_prefix: str)
59    def __init__(self,
60                 conf: json.Data,
61                 conn: Connection,
62                 log_prefix: str):
63        self._conn = conn
64        self._device_id = conf['device_id']
65        self._timeout_poll_delay = conf['timeout_poll_delay']
66        self._log_prefix = f"{log_prefix}: remote device id {self._device_id}"
67        self._data_infos = {data_info.name: data_info
68                            for data_info in _get_data_infos(conf)}
69        self._data_groups = list(_group_data_infos(self._data_infos.values()))
71    @property
72    def conn(self) -> Connection:
73        return self._conn
device_id: int
75    @property
76    def device_id(self) -> int:
77        return self._device_id
79    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
80        return _Reader(conn=self._conn,
81                       device_id=self._device_id,
82                       timeout_poll_delay=self._timeout_poll_delay,
83                       data_groups=self._data_groups,
84                       response_cb=response_cb,
85                       log_prefix=self._log_prefix)
async def write( self, data_name: str, request_id: str, value: int) -> hat.gateway.devices.modbus.master.eventer_client.RemoteDeviceWriteRes | None:
 87    async def write(self,
 88                    data_name: str,
 89                    request_id: str,
 90                    value: int
 91                    ) -> RemoteDeviceWriteRes | None:
 92        data_info = self._data_infos.get(data_name)
 93        if not data_info:
 94            self._log(logging.DEBUG, 'data %s is not available', data_name)
 95            return
 96
 97        if data_info.data_type == DataType.COIL:
 98            result = await self._write_coil(data_info, value)
 99
100        elif data_info.data_type == DataType.HOLDING_REGISTER:
101            result = await self._write_holding_register(data_info, value)
102
103        else:
104            self._log(logging.DEBUG, 'write unsupported for %s',
105                      data_info.data_type)
106            return
107
108        return RemoteDeviceWriteRes(
109            device_id=self._device_id,
110            data_name=data_name,
111            request_id=request_id,
112            result=result.name if result else 'SUCCESS')