hat.gateway.devices.modbus.master.connection
1import asyncio 2import logging 3import time 4 5from hat import json 6from hat import aio 7from hat.drivers import modbus 8from hat.drivers import serial 9from hat.drivers import tcp 10 11from hat.gateway.devices.modbus.master import common 12 13 14mlog = logging.getLogger(__name__) 15 16 17async def connect(conf: json.Data, 18 name: str, 19 request_queue_size: int = 1024 20 ) -> 'Connection': 21 transport_conf = conf['transport'] 22 modbus_type = modbus.ModbusType[conf['modbus_type']] 23 24 if transport_conf['type'] == 'TCP': 25 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 26 master = await modbus.create_tcp_master( 27 modbus_type=modbus_type, 28 addr=addr, 29 response_timeout=conf['request_timeout'], 30 name=name) 31 32 elif transport_conf['type'] == 'SERIAL': 33 port = transport_conf['port'] 34 baudrate = transport_conf['baudrate'] 35 bytesize = serial.ByteSize[transport_conf['bytesize']] 36 parity = serial.Parity[transport_conf['parity']] 37 stopbits = serial.StopBits[transport_conf['stopbits']] 38 xonxoff = transport_conf['flow_control']['xonxoff'] 39 rtscts = transport_conf['flow_control']['rtscts'] 40 dsrdtr = transport_conf['flow_control']['dsrdtr'] 41 silent_interval = transport_conf['silent_interval'] 42 master = await modbus.create_serial_master( 43 modbus_type=modbus_type, 44 port=port, 45 baudrate=baudrate, 46 bytesize=bytesize, 47 parity=parity, 48 stopbits=stopbits, 49 xonxoff=xonxoff, 50 rtscts=rtscts, 51 dsrdtr=dsrdtr, 52 silent_interval=silent_interval, 53 response_timeout=conf['request_timeout'], 54 name=name) 55 56 else: 57 raise ValueError('unsupported link type') 58 59 return Connection(conf=conf, 60 master=master, 61 name=name, 62 request_queue_size=request_queue_size) 63 64 65class Connection(aio.Resource): 66 67 def __init__(self, 68 conf: json.Data, 69 master: modbus.Master, 70 name: str, 71 request_queue_size: int): 72 self._conf = conf 73 self._master = master 74 self._name = name 75 self._request_queue = aio.Queue(request_queue_size) 76 self._loop = asyncio.get_running_loop() 77 self._log = common.create_device_logger_adapter(mlog, name) 78 79 self.async_group.spawn(self._request_loop) 80 81 @property 82 def async_group(self) -> aio.Group: 83 return self._master.async_group 84 85 async def send(self, 86 req: modbus.Request 87 ) -> modbus.Response | common.Timeout: 88 self._log.debug('enqueuing request') 89 90 try: 91 future = self._loop.create_future() 92 delayed_count = self._conf['request_retry_delayed_count'] + 1 93 await self._request_queue.put((req, delayed_count, future)) 94 95 return await future 96 97 except aio.QueueClosedError: 98 raise ConnectionError() 99 100 async def _request_loop(self): 101 future = None 102 res_t = None 103 104 try: 105 self._log.debug('starting request loop') 106 while True: 107 req, delayed_count, future = await self._request_queue.get() # NOQA 108 self._log.debug('dequed request') 109 110 if res_t is not None and self._conf['request_delay'] > 0: 111 dt = time.monotonic() - res_t 112 if dt < self._conf['request_delay']: 113 await asyncio.sleep(self._conf['request_delay'] - dt) 114 115 if future.done(): 116 continue 117 118 delayed_count -= 1 119 immediate_count = ( 120 self._conf['request_retry_immediate_count'] + 1) 121 122 while True: 123 try: 124 immediate_count -= 1 125 res = await self._master.send(req) 126 res_t = time.monotonic() 127 128 self._log.debug('received response %s', res) 129 if not future.done(): 130 future.set_result(res) 131 132 break 133 134 except TimeoutError: 135 self._log.debug('single request timeout') 136 137 if immediate_count > 0: 138 self._log.debug('immediate request retry') 139 continue 140 141 if delayed_count > 0: 142 self._log.debug('delayed request retry') 143 self.async_group.spawn(self._delay_request, req, 144 delayed_count, future) 145 future = None 146 147 elif not future.done(): 148 self._log.debug('request resulting in timeout') 149 future.set_result(common.Timeout()) 150 151 break 152 153 except Exception as e: 154 self._log.debug('setting request exception') 155 if not future.done(): 156 future.set_exception(e) 157 raise 158 159 except ConnectionError: 160 self._log.debug('connection closed') 161 162 except Exception as e: 163 self._log.error('request loop error: %s', e, exc_info=e) 164 165 finally: 166 self._log.debug('closing request loop') 167 self.close() 168 self._request_queue.close() 169 170 while True: 171 if future and not future.done(): 172 future.set_exception(ConnectionError()) 173 if self._request_queue.empty(): 174 break 175 _, __, future = self._request_queue.get_nowait() 176 177 async def _delay_request(self, req, delayed_count, future): 178 try: 179 await asyncio.sleep(self._conf['request_retry_delay']) 180 await self._request_queue.put((req, delayed_count, future)) 181 future = None 182 183 except aio.QueueClosedError: 184 pass 185 186 finally: 187 if future and not future.done(): 188 future.set_exception(ConnectionError())
mlog =
<Logger hat.gateway.devices.modbus.master.connection (WARNING)>
async def
connect( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], name: str, request_queue_size: int = 1024) -> Connection:
18async def connect(conf: json.Data, 19 name: str, 20 request_queue_size: int = 1024 21 ) -> 'Connection': 22 transport_conf = conf['transport'] 23 modbus_type = modbus.ModbusType[conf['modbus_type']] 24 25 if transport_conf['type'] == 'TCP': 26 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 27 master = await modbus.create_tcp_master( 28 modbus_type=modbus_type, 29 addr=addr, 30 response_timeout=conf['request_timeout'], 31 name=name) 32 33 elif transport_conf['type'] == 'SERIAL': 34 port = transport_conf['port'] 35 baudrate = transport_conf['baudrate'] 36 bytesize = serial.ByteSize[transport_conf['bytesize']] 37 parity = serial.Parity[transport_conf['parity']] 38 stopbits = serial.StopBits[transport_conf['stopbits']] 39 xonxoff = transport_conf['flow_control']['xonxoff'] 40 rtscts = transport_conf['flow_control']['rtscts'] 41 dsrdtr = transport_conf['flow_control']['dsrdtr'] 42 silent_interval = transport_conf['silent_interval'] 43 master = await modbus.create_serial_master( 44 modbus_type=modbus_type, 45 port=port, 46 baudrate=baudrate, 47 bytesize=bytesize, 48 parity=parity, 49 stopbits=stopbits, 50 xonxoff=xonxoff, 51 rtscts=rtscts, 52 dsrdtr=dsrdtr, 53 silent_interval=silent_interval, 54 response_timeout=conf['request_timeout'], 55 name=name) 56 57 else: 58 raise ValueError('unsupported link type') 59 60 return Connection(conf=conf, 61 master=master, 62 name=name, 63 request_queue_size=request_queue_size)
class
Connection(hat.aio.group.Resource):
66class Connection(aio.Resource): 67 68 def __init__(self, 69 conf: json.Data, 70 master: modbus.Master, 71 name: str, 72 request_queue_size: int): 73 self._conf = conf 74 self._master = master 75 self._name = name 76 self._request_queue = aio.Queue(request_queue_size) 77 self._loop = asyncio.get_running_loop() 78 self._log = common.create_device_logger_adapter(mlog, name) 79 80 self.async_group.spawn(self._request_loop) 81 82 @property 83 def async_group(self) -> aio.Group: 84 return self._master.async_group 85 86 async def send(self, 87 req: modbus.Request 88 ) -> modbus.Response | common.Timeout: 89 self._log.debug('enqueuing request') 90 91 try: 92 future = self._loop.create_future() 93 delayed_count = self._conf['request_retry_delayed_count'] + 1 94 await self._request_queue.put((req, delayed_count, future)) 95 96 return await future 97 98 except aio.QueueClosedError: 99 raise ConnectionError() 100 101 async def _request_loop(self): 102 future = None 103 res_t = None 104 105 try: 106 self._log.debug('starting request loop') 107 while True: 108 req, delayed_count, future = await self._request_queue.get() # NOQA 109 self._log.debug('dequed request') 110 111 if res_t is not None and self._conf['request_delay'] > 0: 112 dt = time.monotonic() - res_t 113 if dt < self._conf['request_delay']: 114 await asyncio.sleep(self._conf['request_delay'] - dt) 115 116 if future.done(): 117 continue 118 119 delayed_count -= 1 120 immediate_count = ( 121 self._conf['request_retry_immediate_count'] + 1) 122 123 while True: 124 try: 125 immediate_count -= 1 126 res = await self._master.send(req) 127 res_t = time.monotonic() 128 129 self._log.debug('received response %s', res) 130 if not future.done(): 131 future.set_result(res) 132 133 break 134 135 except TimeoutError: 136 self._log.debug('single request timeout') 137 138 if immediate_count > 0: 139 self._log.debug('immediate request retry') 140 continue 141 142 if delayed_count > 0: 143 self._log.debug('delayed request retry') 144 self.async_group.spawn(self._delay_request, req, 145 delayed_count, future) 146 future = None 147 148 elif not future.done(): 149 self._log.debug('request resulting in timeout') 150 future.set_result(common.Timeout()) 151 152 break 153 154 except Exception as e: 155 self._log.debug('setting request exception') 156 if not future.done(): 157 future.set_exception(e) 158 raise 159 160 except ConnectionError: 161 self._log.debug('connection closed') 162 163 except Exception as e: 164 self._log.error('request loop error: %s', e, exc_info=e) 165 166 finally: 167 self._log.debug('closing request loop') 168 self.close() 169 self._request_queue.close() 170 171 while True: 172 if future and not future.done(): 173 future.set_exception(ConnectionError()) 174 if self._request_queue.empty(): 175 break 176 _, __, future = self._request_queue.get_nowait() 177 178 async def _delay_request(self, req, delayed_count, future): 179 try: 180 await asyncio.sleep(self._conf['request_retry_delay']) 181 await self._request_queue.put((req, delayed_count, future)) 182 future = None 183 184 except aio.QueueClosedError: 185 pass 186 187 finally: 188 if future and not future.done(): 189 future.set_exception(ConnectionError())
Resource with lifetime control based on Group.
Connection( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], master: hat.drivers.modbus.master.Master, name: str, request_queue_size: int)
68 def __init__(self, 69 conf: json.Data, 70 master: modbus.Master, 71 name: str, 72 request_queue_size: int): 73 self._conf = conf 74 self._master = master 75 self._name = name 76 self._request_queue = aio.Queue(request_queue_size) 77 self._loop = asyncio.get_running_loop() 78 self._log = common.create_device_logger_adapter(mlog, name) 79 80 self.async_group.spawn(self._request_loop)
async def
send( self, req: hat.drivers.modbus.common.ReadReq | hat.drivers.modbus.common.WriteReq | hat.drivers.modbus.common.WriteMaskReq) -> Sequence[int] | hat.drivers.modbus.common.Error | hat.drivers.modbus.common.Success | hat.gateway.devices.modbus.master.common.Timeout:
86 async def send(self, 87 req: modbus.Request 88 ) -> modbus.Response | common.Timeout: 89 self._log.debug('enqueuing request') 90 91 try: 92 future = self._loop.create_future() 93 delayed_count = self._conf['request_retry_delayed_count'] + 1 94 await self._request_queue.put((req, delayed_count, future)) 95 96 return await future 97 98 except aio.QueueClosedError: 99 raise ConnectionError()