hat.gateway.devices.modbus.master.connection

  1import asyncio
  2import enum
  3import logging
  4import time
  5import typing
  6
  7from hat import json
  8from hat import aio
  9from hat.drivers import modbus
 10from hat.drivers import serial
 11from hat.drivers import tcp
 12
 13
 14mlog = logging.getLogger(__name__)
 15
 16
 17DataType: typing.TypeAlias = modbus.DataType
 18
 19
 20Error = enum.Enum('Error', [
 21    'INVALID_FUNCTION_CODE',
 22    'INVALID_DATA_ADDRESS',
 23    'INVALID_DATA_VALUE',
 24    'FUNCTION_ERROR',
 25    'GATEWAY_PATH_UNAVAILABLE',
 26    'GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND',
 27    'TIMEOUT'])
 28
 29
 30async def connect(conf: json.Data,
 31                  log_prefix: str,
 32                  request_queue_size: int = 1024
 33                  ) -> 'Connection':
 34    transport_conf = conf['transport']
 35    modbus_type = modbus.ModbusType[conf['modbus_type']]
 36
 37    if transport_conf['type'] == 'TCP':
 38        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
 39        master = await modbus.create_tcp_master(
 40            modbus_type=modbus_type,
 41            addr=addr,
 42            response_timeout=conf['request_timeout'])
 43
 44    elif transport_conf['type'] == 'SERIAL':
 45        port = transport_conf['port']
 46        baudrate = transport_conf['baudrate']
 47        bytesize = serial.ByteSize[transport_conf['bytesize']]
 48        parity = serial.Parity[transport_conf['parity']]
 49        stopbits = serial.StopBits[transport_conf['stopbits']]
 50        xonxoff = transport_conf['flow_control']['xonxoff']
 51        rtscts = transport_conf['flow_control']['rtscts']
 52        dsrdtr = transport_conf['flow_control']['dsrdtr']
 53        silent_interval = transport_conf['silent_interval']
 54        master = await modbus.create_serial_master(
 55            modbus_type=modbus_type,
 56            port=port,
 57            baudrate=baudrate,
 58            bytesize=bytesize,
 59            parity=parity,
 60            stopbits=stopbits,
 61            xonxoff=xonxoff,
 62            rtscts=rtscts,
 63            dsrdtr=dsrdtr,
 64            silent_interval=silent_interval,
 65            response_timeout=conf['request_timeout'])
 66
 67    else:
 68        raise ValueError('unsupported link type')
 69
 70    return Connection(conf=conf,
 71                      master=master,
 72                      log_prefix=log_prefix,
 73                      request_queue_size=request_queue_size)
 74
 75
 76class Connection(aio.Resource):
 77
 78    def __init__(self,
 79                 conf: json.Data,
 80                 master: modbus.Master,
 81                 log_prefix: str,
 82                 request_queue_size: int):
 83        self._conf = conf
 84        self._log_prefix = log_prefix
 85        self._master = master
 86        self._request_queue = aio.Queue(request_queue_size)
 87
 88        self.async_group.spawn(self._request_loop)
 89
 90    @property
 91    def async_group(self) -> aio.Group:
 92        return self._master.async_group
 93
 94    async def read(self,
 95                   device_id: int,
 96                   data_type: modbus.DataType,
 97                   start_address: int,
 98                   quantity: int
 99                   ) -> list[int] | Error:
100        self._log(logging.DEBUG, 'enqueuing read request')
101        return await self._request(self._master.read, device_id, data_type,
102                                   start_address, quantity)
103
104    async def write(self,
105                    device_id: int,
106                    data_type: modbus.DataType,
107                    start_address: int,
108                    values: list[int]
109                    ) -> Error | None:
110        self._log(logging.DEBUG, 'enqueuing write request')
111        return await self._request(self._master.write, device_id, data_type,
112                                   start_address, values)
113
114    async def write_mask(self,
115                         device_id: int,
116                         address: int,
117                         and_mask: int,
118                         or_mask: int
119                         ) -> Error | None:
120        self._log(logging.DEBUG, 'enqueuing write mask request')
121        return await self._request(self._master.write_mask, device_id,
122                                   address, and_mask, or_mask)
123
124    async def _request_loop(self):
125        future = None
126        result_t = None
127
128        try:
129            self._log(logging.DEBUG, 'starting request loop')
130            while True:
131                fn, args, delayed_count, future = await self._request_queue.get()  # NOQA
132                self._log(logging.DEBUG, 'dequed request')
133
134                if result_t is not None and self._conf['request_delay'] > 0:
135                    dt = time.monotonic() - result_t
136                    if dt < self._conf['request_delay']:
137                        await asyncio.sleep(self._conf['request_delay'] - dt)
138
139                if future.done():
140                    continue
141
142                delayed_count -= 1
143                immediate_count = (
144                    self._conf['request_retry_immediate_count'] + 1)
145
146                while True:
147                    try:
148                        immediate_count -= 1
149                        result = await fn(*args)
150                        result_t = time.monotonic()
151
152                        self._log(logging.DEBUG, 'received result %s', result)
153                        if isinstance(result, modbus.Error):
154                            result = Error[result.name]
155
156                        if not future.done():
157                            self._log(logging.DEBUG, 'setting request result')
158                            future.set_result(result)
159
160                        break
161
162                    except TimeoutError:
163                        self._log(logging.DEBUG, 'single request timeout')
164
165                        if immediate_count > 0:
166                            self._log(logging.DEBUG, 'immediate request retry')
167                            continue
168
169                        if delayed_count > 0:
170                            self._log(logging.DEBUG, 'delayed request retry')
171                            self.async_group.spawn(self._delay_request, fn,
172                                                   args, delayed_count, future)
173                            future = None
174
175                        elif not future.done():
176                            self._log(logging.DEBUG,
177                                      'request resulting in timeout')
178                            future.set_result(Error.TIMEOUT)
179
180                        break
181
182                    except Exception as e:
183                        self._log(logging.DEBUG, 'setting request exception')
184                        if not future.done():
185                            future.set_exception(e)
186                        raise
187
188        except ConnectionError:
189            self._log(logging.DEBUG, 'connection closed')
190
191        except Exception as e:
192            self._log(logging.ERROR, 'request loop error: %s', e, exc_info=e)
193
194        finally:
195            self._log(logging.DEBUG, 'closing request loop')
196            self.close()
197            self._request_queue.close()
198
199            while True:
200                if future and not future.done():
201                    future.set_exception(ConnectionError())
202                if self._request_queue.empty():
203                    break
204                _, __, ___, future = self._request_queue.get_nowait()
205
206    async def _request(self, fn, *args):
207        try:
208            future = asyncio.Future()
209            delayed_count = self._conf['request_retry_delayed_count'] + 1
210            await self._request_queue.put((fn, args, delayed_count, future))
211
212            return await future
213
214        except aio.QueueClosedError:
215            raise ConnectionError()
216
217    async def _delay_request(self, fn, args, delayed_count, future):
218        try:
219            await asyncio.sleep(self._conf['request_retry_delay'])
220            await self._request_queue.put((fn, args, delayed_count, future))
221            future = None
222
223        except aio.QueueClosedError:
224            pass
225
226        finally:
227            if future and not future.done():
228                future.set_exception(ConnectionError())
229
230    def _log(self, level, msg, *args, **kwargs):
231        if not mlog.isEnabledFor(level):
232            return
233
234        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
class DataType(enum.Enum):
13class DataType(enum.Enum):
14    COIL = 1
15    DISCRETE_INPUT = 2
16    HOLDING_REGISTER = 3
17    INPUT_REGISTER = 4
18    QUEUE = 5

An enumeration.

COIL = <DataType.COIL: 1>
DISCRETE_INPUT = <DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER = <DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER = <DataType.INPUT_REGISTER: 4>
QUEUE = <DataType.QUEUE: 5>
class Error(enum.Enum):

An enumeration.

INVALID_FUNCTION_CODE = <Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS = <Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE = <Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR = <Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE = <Error.GATEWAY_PATH_UNAVAILABLE: 5>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = <Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 6>
TIMEOUT = <Error.TIMEOUT: 7>
async def connect( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], log_prefix: str, request_queue_size: int = 1024) -> Connection:
31async def connect(conf: json.Data,
32                  log_prefix: str,
33                  request_queue_size: int = 1024
34                  ) -> 'Connection':
35    transport_conf = conf['transport']
36    modbus_type = modbus.ModbusType[conf['modbus_type']]
37
38    if transport_conf['type'] == 'TCP':
39        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
40        master = await modbus.create_tcp_master(
41            modbus_type=modbus_type,
42            addr=addr,
43            response_timeout=conf['request_timeout'])
44
45    elif transport_conf['type'] == 'SERIAL':
46        port = transport_conf['port']
47        baudrate = transport_conf['baudrate']
48        bytesize = serial.ByteSize[transport_conf['bytesize']]
49        parity = serial.Parity[transport_conf['parity']]
50        stopbits = serial.StopBits[transport_conf['stopbits']]
51        xonxoff = transport_conf['flow_control']['xonxoff']
52        rtscts = transport_conf['flow_control']['rtscts']
53        dsrdtr = transport_conf['flow_control']['dsrdtr']
54        silent_interval = transport_conf['silent_interval']
55        master = await modbus.create_serial_master(
56            modbus_type=modbus_type,
57            port=port,
58            baudrate=baudrate,
59            bytesize=bytesize,
60            parity=parity,
61            stopbits=stopbits,
62            xonxoff=xonxoff,
63            rtscts=rtscts,
64            dsrdtr=dsrdtr,
65            silent_interval=silent_interval,
66            response_timeout=conf['request_timeout'])
67
68    else:
69        raise ValueError('unsupported link type')
70
71    return Connection(conf=conf,
72                      master=master,
73                      log_prefix=log_prefix,
74                      request_queue_size=request_queue_size)
class Connection(hat.aio.group.Resource):
 77class Connection(aio.Resource):
 78
 79    def __init__(self,
 80                 conf: json.Data,
 81                 master: modbus.Master,
 82                 log_prefix: str,
 83                 request_queue_size: int):
 84        self._conf = conf
 85        self._log_prefix = log_prefix
 86        self._master = master
 87        self._request_queue = aio.Queue(request_queue_size)
 88
 89        self.async_group.spawn(self._request_loop)
 90
 91    @property
 92    def async_group(self) -> aio.Group:
 93        return self._master.async_group
 94
 95    async def read(self,
 96                   device_id: int,
 97                   data_type: modbus.DataType,
 98                   start_address: int,
 99                   quantity: int
100                   ) -> list[int] | Error:
101        self._log(logging.DEBUG, 'enqueuing read request')
102        return await self._request(self._master.read, device_id, data_type,
103                                   start_address, quantity)
104
105    async def write(self,
106                    device_id: int,
107                    data_type: modbus.DataType,
108                    start_address: int,
109                    values: list[int]
110                    ) -> Error | None:
111        self._log(logging.DEBUG, 'enqueuing write request')
112        return await self._request(self._master.write, device_id, data_type,
113                                   start_address, values)
114
115    async def write_mask(self,
116                         device_id: int,
117                         address: int,
118                         and_mask: int,
119                         or_mask: int
120                         ) -> Error | None:
121        self._log(logging.DEBUG, 'enqueuing write mask request')
122        return await self._request(self._master.write_mask, device_id,
123                                   address, and_mask, or_mask)
124
125    async def _request_loop(self):
126        future = None
127        result_t = None
128
129        try:
130            self._log(logging.DEBUG, 'starting request loop')
131            while True:
132                fn, args, delayed_count, future = await self._request_queue.get()  # NOQA
133                self._log(logging.DEBUG, 'dequed request')
134
135                if result_t is not None and self._conf['request_delay'] > 0:
136                    dt = time.monotonic() - result_t
137                    if dt < self._conf['request_delay']:
138                        await asyncio.sleep(self._conf['request_delay'] - dt)
139
140                if future.done():
141                    continue
142
143                delayed_count -= 1
144                immediate_count = (
145                    self._conf['request_retry_immediate_count'] + 1)
146
147                while True:
148                    try:
149                        immediate_count -= 1
150                        result = await fn(*args)
151                        result_t = time.monotonic()
152
153                        self._log(logging.DEBUG, 'received result %s', result)
154                        if isinstance(result, modbus.Error):
155                            result = Error[result.name]
156
157                        if not future.done():
158                            self._log(logging.DEBUG, 'setting request result')
159                            future.set_result(result)
160
161                        break
162
163                    except TimeoutError:
164                        self._log(logging.DEBUG, 'single request timeout')
165
166                        if immediate_count > 0:
167                            self._log(logging.DEBUG, 'immediate request retry')
168                            continue
169
170                        if delayed_count > 0:
171                            self._log(logging.DEBUG, 'delayed request retry')
172                            self.async_group.spawn(self._delay_request, fn,
173                                                   args, delayed_count, future)
174                            future = None
175
176                        elif not future.done():
177                            self._log(logging.DEBUG,
178                                      'request resulting in timeout')
179                            future.set_result(Error.TIMEOUT)
180
181                        break
182
183                    except Exception as e:
184                        self._log(logging.DEBUG, 'setting request exception')
185                        if not future.done():
186                            future.set_exception(e)
187                        raise
188
189        except ConnectionError:
190            self._log(logging.DEBUG, 'connection closed')
191
192        except Exception as e:
193            self._log(logging.ERROR, 'request loop error: %s', e, exc_info=e)
194
195        finally:
196            self._log(logging.DEBUG, 'closing request loop')
197            self.close()
198            self._request_queue.close()
199
200            while True:
201                if future and not future.done():
202                    future.set_exception(ConnectionError())
203                if self._request_queue.empty():
204                    break
205                _, __, ___, future = self._request_queue.get_nowait()
206
207    async def _request(self, fn, *args):
208        try:
209            future = asyncio.Future()
210            delayed_count = self._conf['request_retry_delayed_count'] + 1
211            await self._request_queue.put((fn, args, delayed_count, future))
212
213            return await future
214
215        except aio.QueueClosedError:
216            raise ConnectionError()
217
218    async def _delay_request(self, fn, args, delayed_count, future):
219        try:
220            await asyncio.sleep(self._conf['request_retry_delay'])
221            await self._request_queue.put((fn, args, delayed_count, future))
222            future = None
223
224        except aio.QueueClosedError:
225            pass
226
227        finally:
228            if future and not future.done():
229                future.set_exception(ConnectionError())
230
231    def _log(self, level, msg, *args, **kwargs):
232        if not mlog.isEnabledFor(level):
233            return
234
235        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)

Resource with lifetime control based on Group.

Connection( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], master: hat.drivers.modbus.master.Master, log_prefix: str, request_queue_size: int)
79    def __init__(self,
80                 conf: json.Data,
81                 master: modbus.Master,
82                 log_prefix: str,
83                 request_queue_size: int):
84        self._conf = conf
85        self._log_prefix = log_prefix
86        self._master = master
87        self._request_queue = aio.Queue(request_queue_size)
88
89        self.async_group.spawn(self._request_loop)
async_group: hat.aio.group.Group
91    @property
92    def async_group(self) -> aio.Group:
93        return self._master.async_group

Group controlling resource's lifetime.

async def read( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, quantity: int) -> list[int] | Error:
 95    async def read(self,
 96                   device_id: int,
 97                   data_type: modbus.DataType,
 98                   start_address: int,
 99                   quantity: int
100                   ) -> list[int] | Error:
101        self._log(logging.DEBUG, 'enqueuing read request')
102        return await self._request(self._master.read, device_id, data_type,
103                                   start_address, quantity)
async def write( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, values: list[int]) -> Error | None:
105    async def write(self,
106                    device_id: int,
107                    data_type: modbus.DataType,
108                    start_address: int,
109                    values: list[int]
110                    ) -> Error | None:
111        self._log(logging.DEBUG, 'enqueuing write request')
112        return await self._request(self._master.write, device_id, data_type,
113                                   start_address, values)
async def write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
115    async def write_mask(self,
116                         device_id: int,
117                         address: int,
118                         and_mask: int,
119                         or_mask: int
120                         ) -> Error | None:
121        self._log(logging.DEBUG, 'enqueuing write mask request')
122        return await self._request(self._master.write_mask, device_id,
123                                   address, and_mask, or_mask)