hat.gateway.devices.modbus.master.remote_device

  1import asyncio
  2import collections
  3import contextlib
  4import enum
  5import functools
  6import itertools
  7import logging
  8import math
  9import time
 10import typing
 11
 12from hat import aio
 13from hat import json
 14from hat.drivers import modbus
 15
 16from hat.gateway.devices.modbus.master import common
 17from hat.gateway.devices.modbus.master.connection import Connection
 18
 19
 20mlog = logging.getLogger(__name__)
 21
 22
 23ResponseCb: typing.TypeAlias = aio.AsyncCallable[[common.Response], None]
 24
 25
 26class _Status(enum.Enum):
 27    DISABLED = 'DISABLED'
 28    CONNECTING = 'CONNECTING'
 29    CONNECTED = 'CONNECTED'
 30    DISCONNECTED = 'DISCONNECTED'
 31
 32
 33class _DataInfo(typing.NamedTuple):
 34    data_type: modbus.DataType
 35    register_size: int
 36    start_address: int
 37    bit_count: int
 38    bit_offset: int
 39    quantity: int
 40    interval: float | None
 41    name: str
 42
 43
 44class _DataGroup(typing.NamedTuple):
 45    data_infos: list[_DataInfo]
 46    interval: float
 47    data_type: modbus.DataType
 48    start_address: int
 49    quantity: int
 50
 51
 52class RemoteDevice:
 53
 54    def __init__(self,
 55                 conf: json.Data,
 56                 conn: Connection,
 57                 name: str):
 58        self._conn = conn
 59        self._name = name
 60        self._device_id = conf['device_id']
 61        self._timeout_poll_delay = conf['timeout_poll_delay']
 62        self._data_infos = {data_info.name: data_info
 63                            for data_info in _get_data_infos(conf)}
 64        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 65        self._log = common.create_remote_device_logger_adapter(mlog, name,
 66                                                               self._device_id)
 67
 68    @property
 69    def conn(self) -> Connection:
 70        return self._conn
 71
 72    @property
 73    def device_id(self) -> int:
 74        return self._device_id
 75
 76    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 77        return _Reader(conn=self._conn,
 78                       name=self._name,
 79                       device_id=self._device_id,
 80                       timeout_poll_delay=self._timeout_poll_delay,
 81                       data_groups=self._data_groups,
 82                       response_cb=response_cb)
 83
 84    async def write(self,
 85                    data_name: str,
 86                    request_id: str,
 87                    value: int
 88                    ) -> common.RemoteDeviceWriteRes | None:
 89        data_info = self._data_infos.get(data_name)
 90        if not data_info:
 91            self._log.debug('data %s is not available', data_name)
 92            return
 93
 94        if data_info.data_type == modbus.DataType.COIL:
 95            res = await self._write_coil(data_info, value)
 96
 97        elif data_info.data_type == modbus.DataType.HOLDING_REGISTER:
 98            res = await self._write_holding_register(data_info, value)
 99
100        else:
101            self._log.debug('write unsupported for %s', data_info.data_type)
102            return
103
104        if isinstance(res, modbus.Error):
105            result = res.name
106
107        elif isinstance(res, common.Timeout):
108            result = 'TIMEOUT'
109
110        else:
111            result = 'SUCCESS'
112
113        return common.RemoteDeviceWriteRes(
114            device_id=self._device_id,
115            data_name=data_name,
116            request_id=request_id,
117            result=result)
118
119    async def _write_coil(self, data_info, value):
120        address = data_info.start_address + data_info.bit_offset
121        registers = [(value >> (data_info.bit_count - i - 1)) & 1
122                     for i in range(data_info.bit_count)]
123        req = modbus.WriteReq(device_id=self._device_id,
124                              data_type=data_info.data_type,
125                              start_address=address,
126                              values=registers)
127
128        return await self._conn.send(req)
129
130    async def _write_holding_register(self, data_info, value):
131        address = data_info.start_address + (data_info.bit_offset // 16)
132        bit_count = data_info.bit_count
133        bit_offset = data_info.bit_offset % 16
134
135        if bit_offset:
136            mask_prefix_size = bit_offset
137            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
138            mask_size = 16 - mask_prefix_size - mask_suffix_size
139            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
140                        ((0xFFFF << mask_suffix_size) >> 16))
141            or_mask = (((value >> (bit_count - mask_size)) &
142                        ((1 << mask_size) - 1)) <<
143                       mask_suffix_size)
144            req = modbus.WriteMaskReq(device_id=self._device_id,
145                                      address=address,
146                                      and_mask=and_mask,
147                                      or_mask=or_mask)
148
149            res = await self._conn.send(req)
150            if not isinstance(res, modbus.Success):
151                return res
152
153            address += 1
154            bit_count -= mask_size
155
156        register_count = bit_count // 16
157        if register_count:
158            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
159                         for i in range(register_count)]
160            req = modbus.WriteReq(device_id=self._device_id,
161                                  data_type=data_info.data_type,
162                                  start_address=address,
163                                  values=registers)
164
165            res = await self._conn.send(req)
166            if not isinstance(res, modbus.Success):
167                return res
168
169            address += register_count
170            bit_count -= 16 * register_count
171
172        if not bit_count:
173            return modbus.Success()
174
175        and_mask = (0xFFFF << (16 - bit_count)) >> 16
176        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
177        req = modbus.WriteMaskReq(device_id=self._device_id,
178                                  address=address,
179                                  and_mask=and_mask,
180                                  or_mask=or_mask)
181
182        return await self._conn.send(req)
183
184
185class _Reader(aio.Resource):
186
187    def __init__(self, conn, name, device_id, timeout_poll_delay, data_groups,
188                 response_cb):
189        self._conn = conn
190        self._device_id = device_id
191        self._timeout_poll_delay = timeout_poll_delay
192        self._response_cb = response_cb
193        self._status = None
194        self._async_group = conn.async_group.create_subgroup()
195        self._log = common.create_remote_device_logger_adapter(mlog, name,
196                                                               device_id)
197
198        self.async_group.spawn(self._read_loop, data_groups)
199
200    @property
201    def async_group(self) -> aio.Group:
202        return self._async_group
203
204    async def _read_loop(self, data_groups):
205        try:
206            self._log.debug('starting read loop')
207            loop = asyncio.get_running_loop()
208
209            if not data_groups:
210                await self._set_status(_Status.CONNECTED)
211                await loop.create_future()
212
213            last_read_times = [None for _ in data_groups]
214            last_responses = {}
215            await self._set_status(_Status.CONNECTING)
216
217            while True:
218                now = time.monotonic()
219                sleep_dt = None
220                read_data_groups = collections.deque()
221
222                for i, data_group in enumerate(data_groups):
223                    last_read_time = last_read_times[i]
224                    if last_read_time is not None:
225                        dt = data_group.interval - now + last_read_time
226                        if dt > 0:
227                            if sleep_dt is None or dt < sleep_dt:
228                                sleep_dt = dt
229                            continue
230
231                    read_data_groups.append(data_group)
232                    last_read_times[i] = now
233
234                if not read_data_groups:
235                    await asyncio.sleep(sleep_dt)
236                    continue
237
238                timeout = False
239
240                self._log.debug('reading data')
241                for data_group in read_data_groups:
242                    req = modbus.ReadReq(
243                        device_id=self._device_id,
244                        data_type=data_group.data_type,
245                        start_address=data_group.start_address,
246                        quantity=data_group.quantity)
247
248                    res = await self._conn.send(req)
249
250                    if isinstance(res, common.Timeout):
251                        timeout = True
252                        break
253
254                    await self._set_status(_Status.CONNECTED)
255
256                    for data_info in data_group.data_infos:
257                        last_response = last_responses.get(data_info.name)
258
259                        response = self._process_read_res(
260                            data_info, data_group.start_address, res,
261                            last_response)
262
263                        if response:
264                            last_responses[data_info.name] = response
265                            await aio.call(self._response_cb, response)
266
267                if timeout:
268                    await self._set_status(_Status.DISCONNECTED)
269                    await asyncio.sleep(self._timeout_poll_delay)
270
271                    last_read_times = [None for _ in data_groups]
272                    last_responses = {}
273                    await self._set_status(_Status.CONNECTING)
274
275        except ConnectionError:
276            self._log.debug('connection closed')
277
278        except Exception as e:
279            self._log.error('read loop error: %s', e, exc_info=e)
280
281        finally:
282            self._log.debug('closing read loop')
283            self.close()
284
285            with contextlib.suppress(Exception):
286                await aio.uncancellable(self._set_status(_Status.DISABLED))
287
288    def _process_read_res(self, data_info, start_address, res, last_response):
289        if isinstance(res, modbus.Error):
290            result = res.name
291
292        elif isinstance(res, common.Timeout):
293            result = 'TIMEOUT'
294
295        else:
296            result = 'SUCCESS'
297
298        if result != 'SUCCESS':
299            self._log.debug('data name %s: error response %s',
300                            data_info.name, result)
301            return common.RemoteDeviceReadRes(device_id=self._device_id,
302                                              data_name=data_info.name,
303                                              result=result,
304                                              value=None,
305                                              cause=None)
306
307        offset = data_info.start_address - start_address
308        value = _get_registers_value(
309            data_info.register_size, data_info.bit_offset,
310            data_info.bit_count,
311            res[offset:offset+data_info.quantity])
312
313        if last_response is None or last_response.result != 'SUCCESS':
314            self._log.debug('data name %s: initial value %s',
315                            data_info.name, value)
316            return common.RemoteDeviceReadRes(device_id=self._device_id,
317                                              data_name=data_info.name,
318                                              result=result,
319                                              value=value,
320                                              cause='INTERROGATE')
321
322        if last_response.value != value:
323            self._log.debug('data name %s: value change %s -> %s',
324                            data_info.name, last_response.value, value)
325            return common.RemoteDeviceReadRes(device_id=self._device_id,
326                                              data_name=data_info.name,
327                                              result=result,
328                                              value=value,
329                                              cause='CHANGE')
330
331        self._log.debug('data name %s: no value change', data_info.name)
332
333    async def _set_status(self, status):
334        if self._status == status:
335            return
336
337        self._log.debug('changing remote device status %s -> %s',
338                        self._status, status)
339        self._status = status
340
341        response = common.RemoteDeviceStatusRes(device_id=self._device_id,
342                                                status=status.name)
343        await aio.call(self._response_cb, response)
344
345
346def _get_data_infos(conf):
347    for i in conf['data']:
348        data_type = modbus.DataType[i['data_type']]
349        register_size = _get_register_size(data_type)
350        bit_count = i['bit_count']
351        bit_offset = i['bit_offset']
352
353        yield _DataInfo(
354            data_type=data_type,
355            register_size=register_size,
356            start_address=i['start_address'],
357            bit_count=bit_count,
358            bit_offset=bit_offset,
359            quantity=math.ceil((bit_count + bit_offset) / register_size),
360            interval=i['interval'],
361            name=i['name'])
362
363
364def _group_data_infos(data_infos):
365    type_interval_infos_dict = collections.defaultdict(
366        functools.partial(collections.defaultdict, collections.deque))
367
368    for data_info in data_infos:
369        data_type = data_info.data_type
370        interval = data_info.interval
371
372        if interval is None:
373            continue
374
375        type_interval_infos_dict[data_type][interval].append(data_info)
376
377    for data_type, interval_infos_dict in type_interval_infos_dict.items():
378        for interval, data_infos_queue in interval_infos_dict.items():
379            yield from _group_data_infos_with_type_interval(
380                data_infos_queue, data_type, interval)
381
382
383def _group_data_infos_with_type_interval(data_infos, data_type, interval):
384    data_infos_queue = sorted(data_infos,
385                              key=lambda i: (i.start_address, i.quantity))
386    data_infos_queue = collections.deque(data_infos_queue)
387
388    while data_infos_queue:
389        max_quantity = _get_max_quantity(data_type)
390        start_address = None
391        quantity = None
392        data_infos = collections.deque()
393
394        while data_infos_queue:
395            data_info = data_infos_queue.popleft()
396
397            if start_address is None:
398                start_address = data_info.start_address
399
400            if quantity is None:
401                quantity = data_info.quantity
402
403            elif data_info.start_address > start_address + quantity:
404                data_infos_queue.appendleft(data_info)
405                break
406
407            else:
408                new_quantity = (data_info.quantity +
409                                data_info.start_address -
410                                start_address)
411
412                if new_quantity > max_quantity:
413                    data_infos_queue.appendleft(data_info)
414                    break
415
416                if new_quantity > quantity:
417                    quantity = new_quantity
418
419            data_infos.append(data_info)
420
421        if start_address is None or quantity is None:
422            continue
423
424        yield _DataGroup(data_infos=data_infos,
425                         interval=interval,
426                         data_type=data_type,
427                         start_address=start_address,
428                         quantity=quantity)
429
430
431def _get_register_size(data_type):
432    if data_type in (modbus.DataType.COIL,
433                     modbus.DataType.DISCRETE_INPUT):
434        return 1
435
436    if data_type in (modbus.DataType.HOLDING_REGISTER,
437                     modbus.DataType.INPUT_REGISTER,
438                     modbus.DataType.QUEUE):
439        return 16
440
441    raise ValueError('invalid data type')
442
443
444def _get_max_quantity(data_type):
445    if data_type in (modbus.DataType.COIL,
446                     modbus.DataType.DISCRETE_INPUT):
447        return 2000
448
449    if data_type in (modbus.DataType.HOLDING_REGISTER,
450                     modbus.DataType.INPUT_REGISTER):
451        return 125
452
453    if data_type == modbus.DataType.QUEUE:
454        return 1
455
456    raise ValueError('invalid data type')
457
458
459def _get_registers_value(register_size, bit_offset, bit_count, values):
460    result = 0
461    bits = itertools.chain(_get_registers_bits(register_size, values),
462                           itertools.repeat(0))
463    for i in itertools.islice(bits, bit_offset, bit_offset + bit_count):
464        result = (result << 1) | i
465    return result
466
467
468def _get_registers_bits(register_size, values):
469    for value in values:
470        for i in range(register_size):
471            yield (value >> (register_size - i - 1)) & 1
class RemoteDevice:
 53class RemoteDevice:
 54
 55    def __init__(self,
 56                 conf: json.Data,
 57                 conn: Connection,
 58                 name: str):
 59        self._conn = conn
 60        self._name = name
 61        self._device_id = conf['device_id']
 62        self._timeout_poll_delay = conf['timeout_poll_delay']
 63        self._data_infos = {data_info.name: data_info
 64                            for data_info in _get_data_infos(conf)}
 65        self._data_groups = list(_group_data_infos(self._data_infos.values()))
 66        self._log = common.create_remote_device_logger_adapter(mlog, name,
 67                                                               self._device_id)
 68
 69    @property
 70    def conn(self) -> Connection:
 71        return self._conn
 72
 73    @property
 74    def device_id(self) -> int:
 75        return self._device_id
 76
 77    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
 78        return _Reader(conn=self._conn,
 79                       name=self._name,
 80                       device_id=self._device_id,
 81                       timeout_poll_delay=self._timeout_poll_delay,
 82                       data_groups=self._data_groups,
 83                       response_cb=response_cb)
 84
 85    async def write(self,
 86                    data_name: str,
 87                    request_id: str,
 88                    value: int
 89                    ) -> common.RemoteDeviceWriteRes | None:
 90        data_info = self._data_infos.get(data_name)
 91        if not data_info:
 92            self._log.debug('data %s is not available', data_name)
 93            return
 94
 95        if data_info.data_type == modbus.DataType.COIL:
 96            res = await self._write_coil(data_info, value)
 97
 98        elif data_info.data_type == modbus.DataType.HOLDING_REGISTER:
 99            res = await self._write_holding_register(data_info, value)
100
101        else:
102            self._log.debug('write unsupported for %s', data_info.data_type)
103            return
104
105        if isinstance(res, modbus.Error):
106            result = res.name
107
108        elif isinstance(res, common.Timeout):
109            result = 'TIMEOUT'
110
111        else:
112            result = 'SUCCESS'
113
114        return common.RemoteDeviceWriteRes(
115            device_id=self._device_id,
116            data_name=data_name,
117            request_id=request_id,
118            result=result)
119
120    async def _write_coil(self, data_info, value):
121        address = data_info.start_address + data_info.bit_offset
122        registers = [(value >> (data_info.bit_count - i - 1)) & 1
123                     for i in range(data_info.bit_count)]
124        req = modbus.WriteReq(device_id=self._device_id,
125                              data_type=data_info.data_type,
126                              start_address=address,
127                              values=registers)
128
129        return await self._conn.send(req)
130
131    async def _write_holding_register(self, data_info, value):
132        address = data_info.start_address + (data_info.bit_offset // 16)
133        bit_count = data_info.bit_count
134        bit_offset = data_info.bit_offset % 16
135
136        if bit_offset:
137            mask_prefix_size = bit_offset
138            mask_suffix_size = max(16 - bit_offset - bit_count, 0)
139            mask_size = 16 - mask_prefix_size - mask_suffix_size
140            and_mask = (((0xFFFF << (16 - mask_prefix_size)) & 0xFFFF) |
141                        ((0xFFFF << mask_suffix_size) >> 16))
142            or_mask = (((value >> (bit_count - mask_size)) &
143                        ((1 << mask_size) - 1)) <<
144                       mask_suffix_size)
145            req = modbus.WriteMaskReq(device_id=self._device_id,
146                                      address=address,
147                                      and_mask=and_mask,
148                                      or_mask=or_mask)
149
150            res = await self._conn.send(req)
151            if not isinstance(res, modbus.Success):
152                return res
153
154            address += 1
155            bit_count -= mask_size
156
157        register_count = bit_count // 16
158        if register_count:
159            registers = [(value >> (bit_count - 16 * (i + 1))) & 0xFFFF
160                         for i in range(register_count)]
161            req = modbus.WriteReq(device_id=self._device_id,
162                                  data_type=data_info.data_type,
163                                  start_address=address,
164                                  values=registers)
165
166            res = await self._conn.send(req)
167            if not isinstance(res, modbus.Success):
168                return res
169
170            address += register_count
171            bit_count -= 16 * register_count
172
173        if not bit_count:
174            return modbus.Success()
175
176        and_mask = (0xFFFF << (16 - bit_count)) >> 16
177        or_mask = (value & ((1 << bit_count) - 1)) << (16 - bit_count)
178        req = modbus.WriteMaskReq(device_id=self._device_id,
179                                  address=address,
180                                  and_mask=and_mask,
181                                  or_mask=or_mask)
182
183        return await self._conn.send(req)
RemoteDevice( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], conn: hat.gateway.devices.modbus.master.connection.Connection, name: str)
55    def __init__(self,
56                 conf: json.Data,
57                 conn: Connection,
58                 name: str):
59        self._conn = conn
60        self._name = name
61        self._device_id = conf['device_id']
62        self._timeout_poll_delay = conf['timeout_poll_delay']
63        self._data_infos = {data_info.name: data_info
64                            for data_info in _get_data_infos(conf)}
65        self._data_groups = list(_group_data_infos(self._data_infos.values()))
66        self._log = common.create_remote_device_logger_adapter(mlog, name,
67                                                               self._device_id)
69    @property
70    def conn(self) -> Connection:
71        return self._conn
device_id: int
73    @property
74    def device_id(self) -> int:
75        return self._device_id
77    def create_reader(self, response_cb: ResponseCb) -> aio.Resource:
78        return _Reader(conn=self._conn,
79                       name=self._name,
80                       device_id=self._device_id,
81                       timeout_poll_delay=self._timeout_poll_delay,
82                       data_groups=self._data_groups,
83                       response_cb=response_cb)
async def write( self, data_name: str, request_id: str, value: int) -> hat.gateway.devices.modbus.master.common.RemoteDeviceWriteRes | None:
 85    async def write(self,
 86                    data_name: str,
 87                    request_id: str,
 88                    value: int
 89                    ) -> common.RemoteDeviceWriteRes | None:
 90        data_info = self._data_infos.get(data_name)
 91        if not data_info:
 92            self._log.debug('data %s is not available', data_name)
 93            return
 94
 95        if data_info.data_type == modbus.DataType.COIL:
 96            res = await self._write_coil(data_info, value)
 97
 98        elif data_info.data_type == modbus.DataType.HOLDING_REGISTER:
 99            res = await self._write_holding_register(data_info, value)
100
101        else:
102            self._log.debug('write unsupported for %s', data_info.data_type)
103            return
104
105        if isinstance(res, modbus.Error):
106            result = res.name
107
108        elif isinstance(res, common.Timeout):
109            result = 'TIMEOUT'
110
111        else:
112            result = 'SUCCESS'
113
114        return common.RemoteDeviceWriteRes(
115            device_id=self._device_id,
116            data_name=data_name,
117            request_id=request_id,
118            result=result)