hat.gateway.devices.modbus.master.connection
1import asyncio 2import enum 3import logging 4import time 5import typing 6 7from hat import json 8from hat import aio 9from hat.drivers import modbus 10from hat.drivers import serial 11from hat.drivers import tcp 12 13from hat.gateway.devices.modbus.master import common 14 15 16mlog = logging.getLogger(__name__) 17 18 19DataType: typing.TypeAlias = modbus.DataType 20 21 22Error = enum.Enum('Error', [ 23 'INVALID_FUNCTION_CODE', 24 'INVALID_DATA_ADDRESS', 25 'INVALID_DATA_VALUE', 26 'FUNCTION_ERROR', 27 'GATEWAY_PATH_UNAVAILABLE', 28 'GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND', 29 'TIMEOUT']) 30 31 32async def connect(conf: json.Data, 33 name: str, 34 request_queue_size: int = 1024 35 ) -> 'Connection': 36 transport_conf = conf['transport'] 37 modbus_type = modbus.ModbusType[conf['modbus_type']] 38 39 if transport_conf['type'] == 'TCP': 40 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 41 master = await modbus.create_tcp_master( 42 modbus_type=modbus_type, 43 addr=addr, 44 response_timeout=conf['request_timeout'], 45 name=name) 46 47 elif transport_conf['type'] == 'SERIAL': 48 port = transport_conf['port'] 49 baudrate = transport_conf['baudrate'] 50 bytesize = serial.ByteSize[transport_conf['bytesize']] 51 parity = serial.Parity[transport_conf['parity']] 52 stopbits = serial.StopBits[transport_conf['stopbits']] 53 xonxoff = transport_conf['flow_control']['xonxoff'] 54 rtscts = transport_conf['flow_control']['rtscts'] 55 dsrdtr = transport_conf['flow_control']['dsrdtr'] 56 silent_interval = transport_conf['silent_interval'] 57 master = await modbus.create_serial_master( 58 modbus_type=modbus_type, 59 port=port, 60 baudrate=baudrate, 61 bytesize=bytesize, 62 parity=parity, 63 stopbits=stopbits, 64 xonxoff=xonxoff, 65 rtscts=rtscts, 66 dsrdtr=dsrdtr, 67 silent_interval=silent_interval, 68 response_timeout=conf['request_timeout'], 69 name=name) 70 71 else: 72 raise ValueError('unsupported link type') 73 74 return Connection(conf=conf, 75 master=master, 76 name=name, 77 request_queue_size=request_queue_size) 78 79 80class Connection(aio.Resource): 81 82 def __init__(self, 83 conf: json.Data, 84 master: modbus.Master, 85 name: str, 86 request_queue_size: int): 87 self._conf = conf 88 self._master = master 89 self._name = name 90 self._request_queue = aio.Queue(request_queue_size) 91 self._log = common.create_device_logger_adapter(mlog, name) 92 93 self.async_group.spawn(self._request_loop) 94 95 @property 96 def async_group(self) -> aio.Group: 97 return self._master.async_group 98 99 async def read(self, 100 device_id: int, 101 data_type: modbus.DataType, 102 start_address: int, 103 quantity: int 104 ) -> list[int] | Error: 105 self._log.debug('enqueuing read request') 106 return await self._request(self._master.read, device_id, data_type, 107 start_address, quantity) 108 109 async def write(self, 110 device_id: int, 111 data_type: modbus.DataType, 112 start_address: int, 113 values: list[int] 114 ) -> Error | None: 115 self._log.debug('enqueuing write request') 116 return await self._request(self._master.write, device_id, data_type, 117 start_address, values) 118 119 async def write_mask(self, 120 device_id: int, 121 address: int, 122 and_mask: int, 123 or_mask: int 124 ) -> Error | None: 125 self._log.debug('enqueuing write mask request') 126 return await self._request(self._master.write_mask, device_id, 127 address, and_mask, or_mask) 128 129 async def _request_loop(self): 130 future = None 131 result_t = None 132 133 try: 134 self._log.debug('starting request loop') 135 while True: 136 fn, args, delayed_count, future = await self._request_queue.get() # NOQA 137 self._log.debug('dequed request') 138 139 if result_t is not None and self._conf['request_delay'] > 0: 140 dt = time.monotonic() - result_t 141 if dt < self._conf['request_delay']: 142 await asyncio.sleep(self._conf['request_delay'] - dt) 143 144 if future.done(): 145 continue 146 147 delayed_count -= 1 148 immediate_count = ( 149 self._conf['request_retry_immediate_count'] + 1) 150 151 while True: 152 try: 153 immediate_count -= 1 154 result = await fn(*args) 155 result_t = time.monotonic() 156 157 self._log.debug('received result %s', result) 158 if isinstance(result, modbus.Error): 159 result = Error[result.name] 160 161 if not future.done(): 162 self._log.debug('setting request result') 163 future.set_result(result) 164 165 break 166 167 except TimeoutError: 168 self._log.debug('single request timeout') 169 170 if immediate_count > 0: 171 self._log.debug('immediate request retry') 172 continue 173 174 if delayed_count > 0: 175 self._log.debug('delayed request retry') 176 self.async_group.spawn(self._delay_request, fn, 177 args, delayed_count, future) 178 future = None 179 180 elif not future.done(): 181 self._log.debug('request resulting in timeout') 182 future.set_result(Error.TIMEOUT) 183 184 break 185 186 except Exception as e: 187 self._log.debug('setting request exception') 188 if not future.done(): 189 future.set_exception(e) 190 raise 191 192 except ConnectionError: 193 self._log.debug('connection closed') 194 195 except Exception as e: 196 self._log.error('request loop error: %s', e, exc_info=e) 197 198 finally: 199 self._log.debug('closing request loop') 200 self.close() 201 self._request_queue.close() 202 203 while True: 204 if future and not future.done(): 205 future.set_exception(ConnectionError()) 206 if self._request_queue.empty(): 207 break 208 _, __, ___, future = self._request_queue.get_nowait() 209 210 async def _request(self, fn, *args): 211 try: 212 future = asyncio.Future() 213 delayed_count = self._conf['request_retry_delayed_count'] + 1 214 await self._request_queue.put((fn, args, delayed_count, future)) 215 216 return await future 217 218 except aio.QueueClosedError: 219 raise ConnectionError() 220 221 async def _delay_request(self, fn, args, delayed_count, future): 222 try: 223 await asyncio.sleep(self._conf['request_retry_delay']) 224 await self._request_queue.put((fn, args, delayed_count, future)) 225 future = None 226 227 except aio.QueueClosedError: 228 pass 229 230 finally: 231 if future and not future.done(): 232 future.set_exception(ConnectionError())
mlog =
<Logger hat.gateway.devices.modbus.master.connection (WARNING)>
class
DataType(enum.Enum):
13class DataType(enum.Enum): 14 COIL = 1 15 DISCRETE_INPUT = 2 16 HOLDING_REGISTER = 3 17 INPUT_REGISTER = 4 18 QUEUE = 5
COIL =
<DataType.COIL: 1>
DISCRETE_INPUT =
<DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER =
<DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER =
<DataType.INPUT_REGISTER: 4>
QUEUE =
<DataType.QUEUE: 5>
class
Error(enum.Enum):
INVALID_FUNCTION_CODE =
<Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS =
<Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE =
<Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR =
<Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE =
<Error.GATEWAY_PATH_UNAVAILABLE: 5>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND =
<Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 6>
TIMEOUT =
<Error.TIMEOUT: 7>
async def
connect( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], name: str, request_queue_size: int = 1024) -> Connection:
33async def connect(conf: json.Data, 34 name: str, 35 request_queue_size: int = 1024 36 ) -> 'Connection': 37 transport_conf = conf['transport'] 38 modbus_type = modbus.ModbusType[conf['modbus_type']] 39 40 if transport_conf['type'] == 'TCP': 41 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 42 master = await modbus.create_tcp_master( 43 modbus_type=modbus_type, 44 addr=addr, 45 response_timeout=conf['request_timeout'], 46 name=name) 47 48 elif transport_conf['type'] == 'SERIAL': 49 port = transport_conf['port'] 50 baudrate = transport_conf['baudrate'] 51 bytesize = serial.ByteSize[transport_conf['bytesize']] 52 parity = serial.Parity[transport_conf['parity']] 53 stopbits = serial.StopBits[transport_conf['stopbits']] 54 xonxoff = transport_conf['flow_control']['xonxoff'] 55 rtscts = transport_conf['flow_control']['rtscts'] 56 dsrdtr = transport_conf['flow_control']['dsrdtr'] 57 silent_interval = transport_conf['silent_interval'] 58 master = await modbus.create_serial_master( 59 modbus_type=modbus_type, 60 port=port, 61 baudrate=baudrate, 62 bytesize=bytesize, 63 parity=parity, 64 stopbits=stopbits, 65 xonxoff=xonxoff, 66 rtscts=rtscts, 67 dsrdtr=dsrdtr, 68 silent_interval=silent_interval, 69 response_timeout=conf['request_timeout'], 70 name=name) 71 72 else: 73 raise ValueError('unsupported link type') 74 75 return Connection(conf=conf, 76 master=master, 77 name=name, 78 request_queue_size=request_queue_size)
class
Connection(hat.aio.group.Resource):
81class Connection(aio.Resource): 82 83 def __init__(self, 84 conf: json.Data, 85 master: modbus.Master, 86 name: str, 87 request_queue_size: int): 88 self._conf = conf 89 self._master = master 90 self._name = name 91 self._request_queue = aio.Queue(request_queue_size) 92 self._log = common.create_device_logger_adapter(mlog, name) 93 94 self.async_group.spawn(self._request_loop) 95 96 @property 97 def async_group(self) -> aio.Group: 98 return self._master.async_group 99 100 async def read(self, 101 device_id: int, 102 data_type: modbus.DataType, 103 start_address: int, 104 quantity: int 105 ) -> list[int] | Error: 106 self._log.debug('enqueuing read request') 107 return await self._request(self._master.read, device_id, data_type, 108 start_address, quantity) 109 110 async def write(self, 111 device_id: int, 112 data_type: modbus.DataType, 113 start_address: int, 114 values: list[int] 115 ) -> Error | None: 116 self._log.debug('enqueuing write request') 117 return await self._request(self._master.write, device_id, data_type, 118 start_address, values) 119 120 async def write_mask(self, 121 device_id: int, 122 address: int, 123 and_mask: int, 124 or_mask: int 125 ) -> Error | None: 126 self._log.debug('enqueuing write mask request') 127 return await self._request(self._master.write_mask, device_id, 128 address, and_mask, or_mask) 129 130 async def _request_loop(self): 131 future = None 132 result_t = None 133 134 try: 135 self._log.debug('starting request loop') 136 while True: 137 fn, args, delayed_count, future = await self._request_queue.get() # NOQA 138 self._log.debug('dequed request') 139 140 if result_t is not None and self._conf['request_delay'] > 0: 141 dt = time.monotonic() - result_t 142 if dt < self._conf['request_delay']: 143 await asyncio.sleep(self._conf['request_delay'] - dt) 144 145 if future.done(): 146 continue 147 148 delayed_count -= 1 149 immediate_count = ( 150 self._conf['request_retry_immediate_count'] + 1) 151 152 while True: 153 try: 154 immediate_count -= 1 155 result = await fn(*args) 156 result_t = time.monotonic() 157 158 self._log.debug('received result %s', result) 159 if isinstance(result, modbus.Error): 160 result = Error[result.name] 161 162 if not future.done(): 163 self._log.debug('setting request result') 164 future.set_result(result) 165 166 break 167 168 except TimeoutError: 169 self._log.debug('single request timeout') 170 171 if immediate_count > 0: 172 self._log.debug('immediate request retry') 173 continue 174 175 if delayed_count > 0: 176 self._log.debug('delayed request retry') 177 self.async_group.spawn(self._delay_request, fn, 178 args, delayed_count, future) 179 future = None 180 181 elif not future.done(): 182 self._log.debug('request resulting in timeout') 183 future.set_result(Error.TIMEOUT) 184 185 break 186 187 except Exception as e: 188 self._log.debug('setting request exception') 189 if not future.done(): 190 future.set_exception(e) 191 raise 192 193 except ConnectionError: 194 self._log.debug('connection closed') 195 196 except Exception as e: 197 self._log.error('request loop error: %s', e, exc_info=e) 198 199 finally: 200 self._log.debug('closing request loop') 201 self.close() 202 self._request_queue.close() 203 204 while True: 205 if future and not future.done(): 206 future.set_exception(ConnectionError()) 207 if self._request_queue.empty(): 208 break 209 _, __, ___, future = self._request_queue.get_nowait() 210 211 async def _request(self, fn, *args): 212 try: 213 future = asyncio.Future() 214 delayed_count = self._conf['request_retry_delayed_count'] + 1 215 await self._request_queue.put((fn, args, delayed_count, future)) 216 217 return await future 218 219 except aio.QueueClosedError: 220 raise ConnectionError() 221 222 async def _delay_request(self, fn, args, delayed_count, future): 223 try: 224 await asyncio.sleep(self._conf['request_retry_delay']) 225 await self._request_queue.put((fn, args, delayed_count, future)) 226 future = None 227 228 except aio.QueueClosedError: 229 pass 230 231 finally: 232 if future and not future.done(): 233 future.set_exception(ConnectionError())
Resource with lifetime control based on Group.
Connection( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], master: hat.drivers.modbus.master.Master, name: str, request_queue_size: int)
83 def __init__(self, 84 conf: json.Data, 85 master: modbus.Master, 86 name: str, 87 request_queue_size: int): 88 self._conf = conf 89 self._master = master 90 self._name = name 91 self._request_queue = aio.Queue(request_queue_size) 92 self._log = common.create_device_logger_adapter(mlog, name) 93 94 self.async_group.spawn(self._request_loop)
async def
read( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, quantity: int) -> list[int] | Error:
async def
write( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, values: list[int]) -> Error | None:
async def
write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None: