hat.gateway.devices.modbus.master.connection

  1import asyncio
  2import logging
  3import time
  4
  5from hat import json
  6from hat import aio
  7from hat.drivers import modbus
  8from hat.drivers import serial
  9from hat.drivers import tcp
 10
 11from hat.gateway.devices.modbus.master import common
 12
 13
 14mlog = logging.getLogger(__name__)
 15
 16
 17async def connect(conf: json.Data,
 18                  name: str,
 19                  request_queue_size: int = 1024
 20                  ) -> 'Connection':
 21    transport_conf = conf['transport']
 22    modbus_type = modbus.ModbusType[conf['modbus_type']]
 23
 24    if transport_conf['type'] == 'TCP':
 25        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
 26        master = await modbus.create_tcp_master(
 27            modbus_type=modbus_type,
 28            addr=addr,
 29            response_timeout=conf['request_timeout'],
 30            name=name)
 31
 32    elif transport_conf['type'] == 'SERIAL':
 33        port = transport_conf['port']
 34        baudrate = transport_conf['baudrate']
 35        bytesize = serial.ByteSize[transport_conf['bytesize']]
 36        parity = serial.Parity[transport_conf['parity']]
 37        stopbits = serial.StopBits[transport_conf['stopbits']]
 38        xonxoff = transport_conf['flow_control']['xonxoff']
 39        rtscts = transport_conf['flow_control']['rtscts']
 40        dsrdtr = transport_conf['flow_control']['dsrdtr']
 41        silent_interval = transport_conf['silent_interval']
 42        master = await modbus.create_serial_master(
 43            modbus_type=modbus_type,
 44            port=port,
 45            baudrate=baudrate,
 46            bytesize=bytesize,
 47            parity=parity,
 48            stopbits=stopbits,
 49            xonxoff=xonxoff,
 50            rtscts=rtscts,
 51            dsrdtr=dsrdtr,
 52            silent_interval=silent_interval,
 53            response_timeout=conf['request_timeout'],
 54            name=name)
 55
 56    else:
 57        raise ValueError('unsupported link type')
 58
 59    return Connection(conf=conf,
 60                      master=master,
 61                      name=name,
 62                      request_queue_size=request_queue_size)
 63
 64
 65class Connection(aio.Resource):
 66
 67    def __init__(self,
 68                 conf: json.Data,
 69                 master: modbus.Master,
 70                 name: str,
 71                 request_queue_size: int):
 72        self._conf = conf
 73        self._master = master
 74        self._name = name
 75        self._request_queue = aio.Queue(request_queue_size)
 76        self._loop = asyncio.get_running_loop()
 77        self._log = common.create_device_logger_adapter(mlog, name)
 78
 79        self.async_group.spawn(self._request_loop)
 80
 81    @property
 82    def async_group(self) -> aio.Group:
 83        return self._master.async_group
 84
 85    async def send(self,
 86                   req: modbus.Request
 87                   ) -> modbus.Response | common.Timeout:
 88        self._log.debug('enqueuing request')
 89
 90        try:
 91            future = self._loop.create_future()
 92            delayed_count = self._conf['request_retry_delayed_count'] + 1
 93            await self._request_queue.put((req, delayed_count, future))
 94
 95            return await future
 96
 97        except aio.QueueClosedError:
 98            raise ConnectionError()
 99
100    async def _request_loop(self):
101        future = None
102        res_t = None
103
104        try:
105            self._log.debug('starting request loop')
106            while True:
107                req, delayed_count, future = await self._request_queue.get()  # NOQA
108                self._log.debug('dequed request')
109
110                if res_t is not None and self._conf['request_delay'] > 0:
111                    dt = time.monotonic() - res_t
112                    if dt < self._conf['request_delay']:
113                        await asyncio.sleep(self._conf['request_delay'] - dt)
114
115                if future.done():
116                    continue
117
118                delayed_count -= 1
119                immediate_count = (
120                    self._conf['request_retry_immediate_count'] + 1)
121
122                while True:
123                    try:
124                        immediate_count -= 1
125                        res = await self._master.send(req)
126                        res_t = time.monotonic()
127
128                        self._log.debug('received response %s', res)
129                        if not future.done():
130                            future.set_result(res)
131
132                        break
133
134                    except TimeoutError:
135                        self._log.debug('single request timeout')
136
137                        if immediate_count > 0:
138                            self._log.debug('immediate request retry')
139                            continue
140
141                        if delayed_count > 0:
142                            self._log.debug('delayed request retry')
143                            self.async_group.spawn(self._delay_request, req,
144                                                   delayed_count, future)
145                            future = None
146
147                        elif not future.done():
148                            self._log.debug('request resulting in timeout')
149                            future.set_result(common.Timeout())
150
151                        break
152
153                    except Exception as e:
154                        self._log.debug('setting request exception')
155                        if not future.done():
156                            future.set_exception(e)
157                        raise
158
159        except ConnectionError:
160            self._log.debug('connection closed')
161
162        except Exception as e:
163            self._log.error('request loop error: %s', e, exc_info=e)
164
165        finally:
166            self._log.debug('closing request loop')
167            self.close()
168            self._request_queue.close()
169
170            while True:
171                if future and not future.done():
172                    future.set_exception(ConnectionError())
173                if self._request_queue.empty():
174                    break
175                _, __, future = self._request_queue.get_nowait()
176
177    async def _delay_request(self, req, delayed_count, future):
178        try:
179            await asyncio.sleep(self._conf['request_retry_delay'])
180            await self._request_queue.put((req, delayed_count, future))
181            future = None
182
183        except aio.QueueClosedError:
184            pass
185
186        finally:
187            if future and not future.done():
188                future.set_exception(ConnectionError())
async def connect( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], name: str, request_queue_size: int = 1024) -> Connection:
18async def connect(conf: json.Data,
19                  name: str,
20                  request_queue_size: int = 1024
21                  ) -> 'Connection':
22    transport_conf = conf['transport']
23    modbus_type = modbus.ModbusType[conf['modbus_type']]
24
25    if transport_conf['type'] == 'TCP':
26        addr = tcp.Address(transport_conf['host'], transport_conf['port'])
27        master = await modbus.create_tcp_master(
28            modbus_type=modbus_type,
29            addr=addr,
30            response_timeout=conf['request_timeout'],
31            name=name)
32
33    elif transport_conf['type'] == 'SERIAL':
34        port = transport_conf['port']
35        baudrate = transport_conf['baudrate']
36        bytesize = serial.ByteSize[transport_conf['bytesize']]
37        parity = serial.Parity[transport_conf['parity']]
38        stopbits = serial.StopBits[transport_conf['stopbits']]
39        xonxoff = transport_conf['flow_control']['xonxoff']
40        rtscts = transport_conf['flow_control']['rtscts']
41        dsrdtr = transport_conf['flow_control']['dsrdtr']
42        silent_interval = transport_conf['silent_interval']
43        master = await modbus.create_serial_master(
44            modbus_type=modbus_type,
45            port=port,
46            baudrate=baudrate,
47            bytesize=bytesize,
48            parity=parity,
49            stopbits=stopbits,
50            xonxoff=xonxoff,
51            rtscts=rtscts,
52            dsrdtr=dsrdtr,
53            silent_interval=silent_interval,
54            response_timeout=conf['request_timeout'],
55            name=name)
56
57    else:
58        raise ValueError('unsupported link type')
59
60    return Connection(conf=conf,
61                      master=master,
62                      name=name,
63                      request_queue_size=request_queue_size)
class Connection(hat.aio.group.Resource):
 66class Connection(aio.Resource):
 67
 68    def __init__(self,
 69                 conf: json.Data,
 70                 master: modbus.Master,
 71                 name: str,
 72                 request_queue_size: int):
 73        self._conf = conf
 74        self._master = master
 75        self._name = name
 76        self._request_queue = aio.Queue(request_queue_size)
 77        self._loop = asyncio.get_running_loop()
 78        self._log = common.create_device_logger_adapter(mlog, name)
 79
 80        self.async_group.spawn(self._request_loop)
 81
 82    @property
 83    def async_group(self) -> aio.Group:
 84        return self._master.async_group
 85
 86    async def send(self,
 87                   req: modbus.Request
 88                   ) -> modbus.Response | common.Timeout:
 89        self._log.debug('enqueuing request')
 90
 91        try:
 92            future = self._loop.create_future()
 93            delayed_count = self._conf['request_retry_delayed_count'] + 1
 94            await self._request_queue.put((req, delayed_count, future))
 95
 96            return await future
 97
 98        except aio.QueueClosedError:
 99            raise ConnectionError()
100
101    async def _request_loop(self):
102        future = None
103        res_t = None
104
105        try:
106            self._log.debug('starting request loop')
107            while True:
108                req, delayed_count, future = await self._request_queue.get()  # NOQA
109                self._log.debug('dequed request')
110
111                if res_t is not None and self._conf['request_delay'] > 0:
112                    dt = time.monotonic() - res_t
113                    if dt < self._conf['request_delay']:
114                        await asyncio.sleep(self._conf['request_delay'] - dt)
115
116                if future.done():
117                    continue
118
119                delayed_count -= 1
120                immediate_count = (
121                    self._conf['request_retry_immediate_count'] + 1)
122
123                while True:
124                    try:
125                        immediate_count -= 1
126                        res = await self._master.send(req)
127                        res_t = time.monotonic()
128
129                        self._log.debug('received response %s', res)
130                        if not future.done():
131                            future.set_result(res)
132
133                        break
134
135                    except TimeoutError:
136                        self._log.debug('single request timeout')
137
138                        if immediate_count > 0:
139                            self._log.debug('immediate request retry')
140                            continue
141
142                        if delayed_count > 0:
143                            self._log.debug('delayed request retry')
144                            self.async_group.spawn(self._delay_request, req,
145                                                   delayed_count, future)
146                            future = None
147
148                        elif not future.done():
149                            self._log.debug('request resulting in timeout')
150                            future.set_result(common.Timeout())
151
152                        break
153
154                    except Exception as e:
155                        self._log.debug('setting request exception')
156                        if not future.done():
157                            future.set_exception(e)
158                        raise
159
160        except ConnectionError:
161            self._log.debug('connection closed')
162
163        except Exception as e:
164            self._log.error('request loop error: %s', e, exc_info=e)
165
166        finally:
167            self._log.debug('closing request loop')
168            self.close()
169            self._request_queue.close()
170
171            while True:
172                if future and not future.done():
173                    future.set_exception(ConnectionError())
174                if self._request_queue.empty():
175                    break
176                _, __, future = self._request_queue.get_nowait()
177
178    async def _delay_request(self, req, delayed_count, future):
179        try:
180            await asyncio.sleep(self._conf['request_retry_delay'])
181            await self._request_queue.put((req, delayed_count, future))
182            future = None
183
184        except aio.QueueClosedError:
185            pass
186
187        finally:
188            if future and not future.done():
189                future.set_exception(ConnectionError())

Resource with lifetime control based on Group.

Connection( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], master: hat.drivers.modbus.master.Master, name: str, request_queue_size: int)
68    def __init__(self,
69                 conf: json.Data,
70                 master: modbus.Master,
71                 name: str,
72                 request_queue_size: int):
73        self._conf = conf
74        self._master = master
75        self._name = name
76        self._request_queue = aio.Queue(request_queue_size)
77        self._loop = asyncio.get_running_loop()
78        self._log = common.create_device_logger_adapter(mlog, name)
79
80        self.async_group.spawn(self._request_loop)
async_group: hat.aio.group.Group
82    @property
83    def async_group(self) -> aio.Group:
84        return self._master.async_group

Group controlling resource's lifetime.

async def send( self, req: hat.drivers.modbus.common.ReadReq | hat.drivers.modbus.common.WriteReq | hat.drivers.modbus.common.WriteMaskReq) -> Sequence[int] | hat.drivers.modbus.common.Error | hat.drivers.modbus.common.Success | hat.gateway.devices.modbus.master.common.Timeout:
86    async def send(self,
87                   req: modbus.Request
88                   ) -> modbus.Response | common.Timeout:
89        self._log.debug('enqueuing request')
90
91        try:
92            future = self._loop.create_future()
93            delayed_count = self._conf['request_retry_delayed_count'] + 1
94            await self._request_queue.put((req, delayed_count, future))
95
96            return await future
97
98        except aio.QueueClosedError:
99            raise ConnectionError()