hat.gateway.devices.modbus.master.connection

  1import asyncio
  2import enum
  3import logging
  4import time
  5import typing
  6
  7from hat import json
  8from hat import aio
  9from hat.drivers import modbus
 10from hat.drivers import serial
 11from hat.drivers import tcp
 12
 13from hat.gateway.devices.modbus.master import common
 14
 15
 16mlog = logging.getLogger(__name__)
 17
 18
 19DataType: typing.TypeAlias = modbus.DataType
 20
 21
 22Error = enum.Enum('Error', [
 23    'INVALID_FUNCTION_CODE',
 24    'INVALID_DATA_ADDRESS',
 25    'INVALID_DATA_VALUE',
 26    'FUNCTION_ERROR',
 27    'GATEWAY_PATH_UNAVAILABLE',
 28    'GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND',
 29    'TIMEOUT'])
 30
 31
 32async def connect(conf: json.Data,
 33                  name: str,
 34                  request_queue_size: int = 1024
 35                  ) -> 'Connection':
 36    transport_conf = conf['transport']
 37    modbus_type = modbus.ModbusType[conf['modbus_type']]
 38
 39    if transport_conf['type'] == 'TCP':
 40        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
 41        master = await modbus.create_tcp_master(
 42            modbus_type=modbus_type,
 43            addr=addr,
 44            response_timeout=conf['request_timeout'],
 45            name=name)
 46
 47    elif transport_conf['type'] == 'SERIAL':
 48        port = transport_conf['port']
 49        baudrate = transport_conf['baudrate']
 50        bytesize = serial.ByteSize[transport_conf['bytesize']]
 51        parity = serial.Parity[transport_conf['parity']]
 52        stopbits = serial.StopBits[transport_conf['stopbits']]
 53        xonxoff = transport_conf['flow_control']['xonxoff']
 54        rtscts = transport_conf['flow_control']['rtscts']
 55        dsrdtr = transport_conf['flow_control']['dsrdtr']
 56        silent_interval = transport_conf['silent_interval']
 57        master = await modbus.create_serial_master(
 58            modbus_type=modbus_type,
 59            port=port,
 60            baudrate=baudrate,
 61            bytesize=bytesize,
 62            parity=parity,
 63            stopbits=stopbits,
 64            xonxoff=xonxoff,
 65            rtscts=rtscts,
 66            dsrdtr=dsrdtr,
 67            silent_interval=silent_interval,
 68            response_timeout=conf['request_timeout'],
 69            name=name)
 70
 71    else:
 72        raise ValueError('unsupported link type')
 73
 74    return Connection(conf=conf,
 75                      master=master,
 76                      name=name,
 77                      request_queue_size=request_queue_size)
 78
 79
 80class Connection(aio.Resource):
 81
 82    def __init__(self,
 83                 conf: json.Data,
 84                 master: modbus.Master,
 85                 name: str,
 86                 request_queue_size: int):
 87        self._conf = conf
 88        self._master = master
 89        self._name = name
 90        self._request_queue = aio.Queue(request_queue_size)
 91        self._log = common.create_device_logger_adapter(mlog, name)
 92
 93        self.async_group.spawn(self._request_loop)
 94
 95    @property
 96    def async_group(self) -> aio.Group:
 97        return self._master.async_group
 98
 99    async def read(self,
100                   device_id: int,
101                   data_type: modbus.DataType,
102                   start_address: int,
103                   quantity: int
104                   ) -> list[int] | Error:
105        self._log.debug('enqueuing read request')
106        return await self._request(self._master.read, device_id, data_type,
107                                   start_address, quantity)
108
109    async def write(self,
110                    device_id: int,
111                    data_type: modbus.DataType,
112                    start_address: int,
113                    values: list[int]
114                    ) -> Error | None:
115        self._log.debug('enqueuing write request')
116        return await self._request(self._master.write, device_id, data_type,
117                                   start_address, values)
118
119    async def write_mask(self,
120                         device_id: int,
121                         address: int,
122                         and_mask: int,
123                         or_mask: int
124                         ) -> Error | None:
125        self._log.debug('enqueuing write mask request')
126        return await self._request(self._master.write_mask, device_id,
127                                   address, and_mask, or_mask)
128
129    async def _request_loop(self):
130        future = None
131        result_t = None
132
133        try:
134            self._log.debug('starting request loop')
135            while True:
136                fn, args, delayed_count, future = await self._request_queue.get()  # NOQA
137                self._log.debug('dequed request')
138
139                if result_t is not None and self._conf['request_delay'] > 0:
140                    dt = time.monotonic() - result_t
141                    if dt < self._conf['request_delay']:
142                        await asyncio.sleep(self._conf['request_delay'] - dt)
143
144                if future.done():
145                    continue
146
147                delayed_count -= 1
148                immediate_count = (
149                    self._conf['request_retry_immediate_count'] + 1)
150
151                while True:
152                    try:
153                        immediate_count -= 1
154                        result = await fn(*args)
155                        result_t = time.monotonic()
156
157                        self._log.debug('received result %s', result)
158                        if isinstance(result, modbus.Error):
159                            result = Error[result.name]
160
161                        if not future.done():
162                            self._log.debug('setting request result')
163                            future.set_result(result)
164
165                        break
166
167                    except TimeoutError:
168                        self._log.debug('single request timeout')
169
170                        if immediate_count > 0:
171                            self._log.debug('immediate request retry')
172                            continue
173
174                        if delayed_count > 0:
175                            self._log.debug('delayed request retry')
176                            self.async_group.spawn(self._delay_request, fn,
177                                                   args, delayed_count, future)
178                            future = None
179
180                        elif not future.done():
181                            self._log.debug('request resulting in timeout')
182                            future.set_result(Error.TIMEOUT)
183
184                        break
185
186                    except Exception as e:
187                        self._log.debug('setting request exception')
188                        if not future.done():
189                            future.set_exception(e)
190                        raise
191
192        except ConnectionError:
193            self._log.debug('connection closed')
194
195        except Exception as e:
196            self._log.error('request loop error: %s', e, exc_info=e)
197
198        finally:
199            self._log.debug('closing request loop')
200            self.close()
201            self._request_queue.close()
202
203            while True:
204                if future and not future.done():
205                    future.set_exception(ConnectionError())
206                if self._request_queue.empty():
207                    break
208                _, __, ___, future = self._request_queue.get_nowait()
209
210    async def _request(self, fn, *args):
211        try:
212            future = asyncio.Future()
213            delayed_count = self._conf['request_retry_delayed_count'] + 1
214            await self._request_queue.put((fn, args, delayed_count, future))
215
216            return await future
217
218        except aio.QueueClosedError:
219            raise ConnectionError()
220
221    async def _delay_request(self, fn, args, delayed_count, future):
222        try:
223            await asyncio.sleep(self._conf['request_retry_delay'])
224            await self._request_queue.put((fn, args, delayed_count, future))
225            future = None
226
227        except aio.QueueClosedError:
228            pass
229
230        finally:
231            if future and not future.done():
232                future.set_exception(ConnectionError())
class DataType(enum.Enum):
13class DataType(enum.Enum):
14    COIL = 1
15    DISCRETE_INPUT = 2
16    HOLDING_REGISTER = 3
17    INPUT_REGISTER = 4
18    QUEUE = 5
COIL = <DataType.COIL: 1>
DISCRETE_INPUT = <DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER = <DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER = <DataType.INPUT_REGISTER: 4>
QUEUE = <DataType.QUEUE: 5>
class Error(enum.Enum):
INVALID_FUNCTION_CODE = <Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS = <Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE = <Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR = <Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE = <Error.GATEWAY_PATH_UNAVAILABLE: 5>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = <Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 6>
TIMEOUT = <Error.TIMEOUT: 7>
async def connect( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], name: str, request_queue_size: int = 1024) -> Connection:
33async def connect(conf: json.Data,
34                  name: str,
35                  request_queue_size: int = 1024
36                  ) -> 'Connection':
37    transport_conf = conf['transport']
38    modbus_type = modbus.ModbusType[conf['modbus_type']]
39
40    if transport_conf['type'] == 'TCP':
41        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
42        master = await modbus.create_tcp_master(
43            modbus_type=modbus_type,
44            addr=addr,
45            response_timeout=conf['request_timeout'],
46            name=name)
47
48    elif transport_conf['type'] == 'SERIAL':
49        port = transport_conf['port']
50        baudrate = transport_conf['baudrate']
51        bytesize = serial.ByteSize[transport_conf['bytesize']]
52        parity = serial.Parity[transport_conf['parity']]
53        stopbits = serial.StopBits[transport_conf['stopbits']]
54        xonxoff = transport_conf['flow_control']['xonxoff']
55        rtscts = transport_conf['flow_control']['rtscts']
56        dsrdtr = transport_conf['flow_control']['dsrdtr']
57        silent_interval = transport_conf['silent_interval']
58        master = await modbus.create_serial_master(
59            modbus_type=modbus_type,
60            port=port,
61            baudrate=baudrate,
62            bytesize=bytesize,
63            parity=parity,
64            stopbits=stopbits,
65            xonxoff=xonxoff,
66            rtscts=rtscts,
67            dsrdtr=dsrdtr,
68            silent_interval=silent_interval,
69            response_timeout=conf['request_timeout'],
70            name=name)
71
72    else:
73        raise ValueError('unsupported link type')
74
75    return Connection(conf=conf,
76                      master=master,
77                      name=name,
78                      request_queue_size=request_queue_size)
class Connection(hat.aio.group.Resource):
 81class Connection(aio.Resource):
 82
 83    def __init__(self,
 84                 conf: json.Data,
 85                 master: modbus.Master,
 86                 name: str,
 87                 request_queue_size: int):
 88        self._conf = conf
 89        self._master = master
 90        self._name = name
 91        self._request_queue = aio.Queue(request_queue_size)
 92        self._log = common.create_device_logger_adapter(mlog, name)
 93
 94        self.async_group.spawn(self._request_loop)
 95
 96    @property
 97    def async_group(self) -> aio.Group:
 98        return self._master.async_group
 99
100    async def read(self,
101                   device_id: int,
102                   data_type: modbus.DataType,
103                   start_address: int,
104                   quantity: int
105                   ) -> list[int] | Error:
106        self._log.debug('enqueuing read request')
107        return await self._request(self._master.read, device_id, data_type,
108                                   start_address, quantity)
109
110    async def write(self,
111                    device_id: int,
112                    data_type: modbus.DataType,
113                    start_address: int,
114                    values: list[int]
115                    ) -> Error | None:
116        self._log.debug('enqueuing write request')
117        return await self._request(self._master.write, device_id, data_type,
118                                   start_address, values)
119
120    async def write_mask(self,
121                         device_id: int,
122                         address: int,
123                         and_mask: int,
124                         or_mask: int
125                         ) -> Error | None:
126        self._log.debug('enqueuing write mask request')
127        return await self._request(self._master.write_mask, device_id,
128                                   address, and_mask, or_mask)
129
130    async def _request_loop(self):
131        future = None
132        result_t = None
133
134        try:
135            self._log.debug('starting request loop')
136            while True:
137                fn, args, delayed_count, future = await self._request_queue.get()  # NOQA
138                self._log.debug('dequed request')
139
140                if result_t is not None and self._conf['request_delay'] > 0:
141                    dt = time.monotonic() - result_t
142                    if dt < self._conf['request_delay']:
143                        await asyncio.sleep(self._conf['request_delay'] - dt)
144
145                if future.done():
146                    continue
147
148                delayed_count -= 1
149                immediate_count = (
150                    self._conf['request_retry_immediate_count'] + 1)
151
152                while True:
153                    try:
154                        immediate_count -= 1
155                        result = await fn(*args)
156                        result_t = time.monotonic()
157
158                        self._log.debug('received result %s', result)
159                        if isinstance(result, modbus.Error):
160                            result = Error[result.name]
161
162                        if not future.done():
163                            self._log.debug('setting request result')
164                            future.set_result(result)
165
166                        break
167
168                    except TimeoutError:
169                        self._log.debug('single request timeout')
170
171                        if immediate_count > 0:
172                            self._log.debug('immediate request retry')
173                            continue
174
175                        if delayed_count > 0:
176                            self._log.debug('delayed request retry')
177                            self.async_group.spawn(self._delay_request, fn,
178                                                   args, delayed_count, future)
179                            future = None
180
181                        elif not future.done():
182                            self._log.debug('request resulting in timeout')
183                            future.set_result(Error.TIMEOUT)
184
185                        break
186
187                    except Exception as e:
188                        self._log.debug('setting request exception')
189                        if not future.done():
190                            future.set_exception(e)
191                        raise
192
193        except ConnectionError:
194            self._log.debug('connection closed')
195
196        except Exception as e:
197            self._log.error('request loop error: %s', e, exc_info=e)
198
199        finally:
200            self._log.debug('closing request loop')
201            self.close()
202            self._request_queue.close()
203
204            while True:
205                if future and not future.done():
206                    future.set_exception(ConnectionError())
207                if self._request_queue.empty():
208                    break
209                _, __, ___, future = self._request_queue.get_nowait()
210
211    async def _request(self, fn, *args):
212        try:
213            future = asyncio.Future()
214            delayed_count = self._conf['request_retry_delayed_count'] + 1
215            await self._request_queue.put((fn, args, delayed_count, future))
216
217            return await future
218
219        except aio.QueueClosedError:
220            raise ConnectionError()
221
222    async def _delay_request(self, fn, args, delayed_count, future):
223        try:
224            await asyncio.sleep(self._conf['request_retry_delay'])
225            await self._request_queue.put((fn, args, delayed_count, future))
226            future = None
227
228        except aio.QueueClosedError:
229            pass
230
231        finally:
232            if future and not future.done():
233                future.set_exception(ConnectionError())

Resource with lifetime control based on Group.

Connection( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], master: hat.drivers.modbus.master.Master, name: str, request_queue_size: int)
83    def __init__(self,
84                 conf: json.Data,
85                 master: modbus.Master,
86                 name: str,
87                 request_queue_size: int):
88        self._conf = conf
89        self._master = master
90        self._name = name
91        self._request_queue = aio.Queue(request_queue_size)
92        self._log = common.create_device_logger_adapter(mlog, name)
93
94        self.async_group.spawn(self._request_loop)
async_group: hat.aio.group.Group
96    @property
97    def async_group(self) -> aio.Group:
98        return self._master.async_group

Group controlling resource's lifetime.

async def read( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, quantity: int) -> list[int] | Error:
100    async def read(self,
101                   device_id: int,
102                   data_type: modbus.DataType,
103                   start_address: int,
104                   quantity: int
105                   ) -> list[int] | Error:
106        self._log.debug('enqueuing read request')
107        return await self._request(self._master.read, device_id, data_type,
108                                   start_address, quantity)
async def write( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, values: list[int]) -> Error | None:
110    async def write(self,
111                    device_id: int,
112                    data_type: modbus.DataType,
113                    start_address: int,
114                    values: list[int]
115                    ) -> Error | None:
116        self._log.debug('enqueuing write request')
117        return await self._request(self._master.write, device_id, data_type,
118                                   start_address, values)
async def write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
120    async def write_mask(self,
121                         device_id: int,
122                         address: int,
123                         and_mask: int,
124                         or_mask: int
125                         ) -> Error | None:
126        self._log.debug('enqueuing write mask request')
127        return await self._request(self._master.write_mask, device_id,
128                                   address, and_mask, or_mask)