hat.gateway.devices.modbus.master.connection
1import asyncio 2import enum 3import logging 4import time 5import typing 6 7from hat import json 8from hat import aio 9from hat.drivers import modbus 10from hat.drivers import serial 11from hat.drivers import tcp 12 13 14mlog = logging.getLogger(__name__) 15 16 17DataType: typing.TypeAlias = modbus.DataType 18 19 20Error = enum.Enum('Error', [ 21 'INVALID_FUNCTION_CODE', 22 'INVALID_DATA_ADDRESS', 23 'INVALID_DATA_VALUE', 24 'FUNCTION_ERROR', 25 'GATEWAY_PATH_UNAVAILABLE', 26 'GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND', 27 'TIMEOUT']) 28 29 30async def connect(conf: json.Data, 31 log_prefix: str, 32 request_queue_size: int = 1024 33 ) -> 'Connection': 34 transport_conf = conf['transport'] 35 modbus_type = modbus.ModbusType[conf['modbus_type']] 36 37 if transport_conf['type'] == 'TCP': 38 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 39 master = await modbus.create_tcp_master( 40 modbus_type=modbus_type, 41 addr=addr, 42 response_timeout=conf['request_timeout']) 43 44 elif transport_conf['type'] == 'SERIAL': 45 port = transport_conf['port'] 46 baudrate = transport_conf['baudrate'] 47 bytesize = serial.ByteSize[transport_conf['bytesize']] 48 parity = serial.Parity[transport_conf['parity']] 49 stopbits = serial.StopBits[transport_conf['stopbits']] 50 xonxoff = transport_conf['flow_control']['xonxoff'] 51 rtscts = transport_conf['flow_control']['rtscts'] 52 dsrdtr = transport_conf['flow_control']['dsrdtr'] 53 silent_interval = transport_conf['silent_interval'] 54 master = await modbus.create_serial_master( 55 modbus_type=modbus_type, 56 port=port, 57 baudrate=baudrate, 58 bytesize=bytesize, 59 parity=parity, 60 stopbits=stopbits, 61 xonxoff=xonxoff, 62 rtscts=rtscts, 63 dsrdtr=dsrdtr, 64 silent_interval=silent_interval, 65 response_timeout=conf['request_timeout']) 66 67 else: 68 raise ValueError('unsupported link type') 69 70 return Connection(conf=conf, 71 master=master, 72 log_prefix=log_prefix, 73 request_queue_size=request_queue_size) 74 75 76class Connection(aio.Resource): 77 78 def __init__(self, 79 conf: json.Data, 80 master: modbus.Master, 81 log_prefix: str, 82 request_queue_size: int): 83 self._conf = conf 84 self._log_prefix = log_prefix 85 self._master = master 86 self._request_queue = aio.Queue(request_queue_size) 87 88 self.async_group.spawn(self._request_loop) 89 90 @property 91 def async_group(self) -> aio.Group: 92 return self._master.async_group 93 94 async def read(self, 95 device_id: int, 96 data_type: modbus.DataType, 97 start_address: int, 98 quantity: int 99 ) -> list[int] | Error: 100 self._log(logging.DEBUG, 'enqueuing read request') 101 return await self._request(self._master.read, device_id, data_type, 102 start_address, quantity) 103 104 async def write(self, 105 device_id: int, 106 data_type: modbus.DataType, 107 start_address: int, 108 values: list[int] 109 ) -> Error | None: 110 self._log(logging.DEBUG, 'enqueuing write request') 111 return await self._request(self._master.write, device_id, data_type, 112 start_address, values) 113 114 async def write_mask(self, 115 device_id: int, 116 address: int, 117 and_mask: int, 118 or_mask: int 119 ) -> Error | None: 120 self._log(logging.DEBUG, 'enqueuing write mask request') 121 return await self._request(self._master.write_mask, device_id, 122 address, and_mask, or_mask) 123 124 async def _request_loop(self): 125 future = None 126 result_t = None 127 128 try: 129 self._log(logging.DEBUG, 'starting request loop') 130 while True: 131 fn, args, delayed_count, future = await self._request_queue.get() # NOQA 132 self._log(logging.DEBUG, 'dequed request') 133 134 if result_t is not None and self._conf['request_delay'] > 0: 135 dt = time.monotonic() - result_t 136 if dt < self._conf['request_delay']: 137 await asyncio.sleep(self._conf['request_delay'] - dt) 138 139 if future.done(): 140 continue 141 142 delayed_count -= 1 143 immediate_count = ( 144 self._conf['request_retry_immediate_count'] + 1) 145 146 while True: 147 try: 148 immediate_count -= 1 149 result = await fn(*args) 150 result_t = time.monotonic() 151 152 self._log(logging.DEBUG, 'received result %s', result) 153 if isinstance(result, modbus.Error): 154 result = Error[result.name] 155 156 if not future.done(): 157 self._log(logging.DEBUG, 'setting request result') 158 future.set_result(result) 159 160 break 161 162 except TimeoutError: 163 self._log(logging.DEBUG, 'single request timeout') 164 165 if immediate_count > 0: 166 self._log(logging.DEBUG, 'immediate request retry') 167 continue 168 169 if delayed_count > 0: 170 self._log(logging.DEBUG, 'delayed request retry') 171 self.async_group.spawn(self._delay_request, fn, 172 args, delayed_count, future) 173 future = None 174 175 elif not future.done(): 176 self._log(logging.DEBUG, 177 'request resulting in timeout') 178 future.set_result(Error.TIMEOUT) 179 180 break 181 182 except Exception as e: 183 self._log(logging.DEBUG, 'setting request exception') 184 if not future.done(): 185 future.set_exception(e) 186 raise 187 188 except ConnectionError: 189 self._log(logging.DEBUG, 'connection closed') 190 191 except Exception as e: 192 self._log(logging.ERROR, 'request loop error: %s', e, exc_info=e) 193 194 finally: 195 self._log(logging.DEBUG, 'closing request loop') 196 self.close() 197 self._request_queue.close() 198 199 while True: 200 if future and not future.done(): 201 future.set_exception(ConnectionError()) 202 if self._request_queue.empty(): 203 break 204 _, __, ___, future = self._request_queue.get_nowait() 205 206 async def _request(self, fn, *args): 207 try: 208 future = asyncio.Future() 209 delayed_count = self._conf['request_retry_delayed_count'] + 1 210 await self._request_queue.put((fn, args, delayed_count, future)) 211 212 return await future 213 214 except aio.QueueClosedError: 215 raise ConnectionError() 216 217 async def _delay_request(self, fn, args, delayed_count, future): 218 try: 219 await asyncio.sleep(self._conf['request_retry_delay']) 220 await self._request_queue.put((fn, args, delayed_count, future)) 221 future = None 222 223 except aio.QueueClosedError: 224 pass 225 226 finally: 227 if future and not future.done(): 228 future.set_exception(ConnectionError()) 229 230 def _log(self, level, msg, *args, **kwargs): 231 if not mlog.isEnabledFor(level): 232 return 233 234 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
mlog =
<Logger hat.gateway.devices.modbus.master.connection (WARNING)>
class
DataType(enum.Enum):
13class DataType(enum.Enum): 14 COIL = 1 15 DISCRETE_INPUT = 2 16 HOLDING_REGISTER = 3 17 INPUT_REGISTER = 4 18 QUEUE = 5
An enumeration.
COIL =
<DataType.COIL: 1>
DISCRETE_INPUT =
<DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER =
<DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER =
<DataType.INPUT_REGISTER: 4>
QUEUE =
<DataType.QUEUE: 5>
class
Error(enum.Enum):
An enumeration.
INVALID_FUNCTION_CODE =
<Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS =
<Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE =
<Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR =
<Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE =
<Error.GATEWAY_PATH_UNAVAILABLE: 5>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND =
<Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 6>
TIMEOUT =
<Error.TIMEOUT: 7>
async def
connect( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], log_prefix: str, request_queue_size: int = 1024) -> Connection:
31async def connect(conf: json.Data, 32 log_prefix: str, 33 request_queue_size: int = 1024 34 ) -> 'Connection': 35 transport_conf = conf['transport'] 36 modbus_type = modbus.ModbusType[conf['modbus_type']] 37 38 if transport_conf['type'] == 'TCP': 39 addr = tcp.Address(transport_conf['host'], transport_conf['port']) 40 master = await modbus.create_tcp_master( 41 modbus_type=modbus_type, 42 addr=addr, 43 response_timeout=conf['request_timeout']) 44 45 elif transport_conf['type'] == 'SERIAL': 46 port = transport_conf['port'] 47 baudrate = transport_conf['baudrate'] 48 bytesize = serial.ByteSize[transport_conf['bytesize']] 49 parity = serial.Parity[transport_conf['parity']] 50 stopbits = serial.StopBits[transport_conf['stopbits']] 51 xonxoff = transport_conf['flow_control']['xonxoff'] 52 rtscts = transport_conf['flow_control']['rtscts'] 53 dsrdtr = transport_conf['flow_control']['dsrdtr'] 54 silent_interval = transport_conf['silent_interval'] 55 master = await modbus.create_serial_master( 56 modbus_type=modbus_type, 57 port=port, 58 baudrate=baudrate, 59 bytesize=bytesize, 60 parity=parity, 61 stopbits=stopbits, 62 xonxoff=xonxoff, 63 rtscts=rtscts, 64 dsrdtr=dsrdtr, 65 silent_interval=silent_interval, 66 response_timeout=conf['request_timeout']) 67 68 else: 69 raise ValueError('unsupported link type') 70 71 return Connection(conf=conf, 72 master=master, 73 log_prefix=log_prefix, 74 request_queue_size=request_queue_size)
class
Connection(hat.aio.group.Resource):
77class Connection(aio.Resource): 78 79 def __init__(self, 80 conf: json.Data, 81 master: modbus.Master, 82 log_prefix: str, 83 request_queue_size: int): 84 self._conf = conf 85 self._log_prefix = log_prefix 86 self._master = master 87 self._request_queue = aio.Queue(request_queue_size) 88 89 self.async_group.spawn(self._request_loop) 90 91 @property 92 def async_group(self) -> aio.Group: 93 return self._master.async_group 94 95 async def read(self, 96 device_id: int, 97 data_type: modbus.DataType, 98 start_address: int, 99 quantity: int 100 ) -> list[int] | Error: 101 self._log(logging.DEBUG, 'enqueuing read request') 102 return await self._request(self._master.read, device_id, data_type, 103 start_address, quantity) 104 105 async def write(self, 106 device_id: int, 107 data_type: modbus.DataType, 108 start_address: int, 109 values: list[int] 110 ) -> Error | None: 111 self._log(logging.DEBUG, 'enqueuing write request') 112 return await self._request(self._master.write, device_id, data_type, 113 start_address, values) 114 115 async def write_mask(self, 116 device_id: int, 117 address: int, 118 and_mask: int, 119 or_mask: int 120 ) -> Error | None: 121 self._log(logging.DEBUG, 'enqueuing write mask request') 122 return await self._request(self._master.write_mask, device_id, 123 address, and_mask, or_mask) 124 125 async def _request_loop(self): 126 future = None 127 result_t = None 128 129 try: 130 self._log(logging.DEBUG, 'starting request loop') 131 while True: 132 fn, args, delayed_count, future = await self._request_queue.get() # NOQA 133 self._log(logging.DEBUG, 'dequed request') 134 135 if result_t is not None and self._conf['request_delay'] > 0: 136 dt = time.monotonic() - result_t 137 if dt < self._conf['request_delay']: 138 await asyncio.sleep(self._conf['request_delay'] - dt) 139 140 if future.done(): 141 continue 142 143 delayed_count -= 1 144 immediate_count = ( 145 self._conf['request_retry_immediate_count'] + 1) 146 147 while True: 148 try: 149 immediate_count -= 1 150 result = await fn(*args) 151 result_t = time.monotonic() 152 153 self._log(logging.DEBUG, 'received result %s', result) 154 if isinstance(result, modbus.Error): 155 result = Error[result.name] 156 157 if not future.done(): 158 self._log(logging.DEBUG, 'setting request result') 159 future.set_result(result) 160 161 break 162 163 except TimeoutError: 164 self._log(logging.DEBUG, 'single request timeout') 165 166 if immediate_count > 0: 167 self._log(logging.DEBUG, 'immediate request retry') 168 continue 169 170 if delayed_count > 0: 171 self._log(logging.DEBUG, 'delayed request retry') 172 self.async_group.spawn(self._delay_request, fn, 173 args, delayed_count, future) 174 future = None 175 176 elif not future.done(): 177 self._log(logging.DEBUG, 178 'request resulting in timeout') 179 future.set_result(Error.TIMEOUT) 180 181 break 182 183 except Exception as e: 184 self._log(logging.DEBUG, 'setting request exception') 185 if not future.done(): 186 future.set_exception(e) 187 raise 188 189 except ConnectionError: 190 self._log(logging.DEBUG, 'connection closed') 191 192 except Exception as e: 193 self._log(logging.ERROR, 'request loop error: %s', e, exc_info=e) 194 195 finally: 196 self._log(logging.DEBUG, 'closing request loop') 197 self.close() 198 self._request_queue.close() 199 200 while True: 201 if future and not future.done(): 202 future.set_exception(ConnectionError()) 203 if self._request_queue.empty(): 204 break 205 _, __, ___, future = self._request_queue.get_nowait() 206 207 async def _request(self, fn, *args): 208 try: 209 future = asyncio.Future() 210 delayed_count = self._conf['request_retry_delayed_count'] + 1 211 await self._request_queue.put((fn, args, delayed_count, future)) 212 213 return await future 214 215 except aio.QueueClosedError: 216 raise ConnectionError() 217 218 async def _delay_request(self, fn, args, delayed_count, future): 219 try: 220 await asyncio.sleep(self._conf['request_retry_delay']) 221 await self._request_queue.put((fn, args, delayed_count, future)) 222 future = None 223 224 except aio.QueueClosedError: 225 pass 226 227 finally: 228 if future and not future.done(): 229 future.set_exception(ConnectionError()) 230 231 def _log(self, level, msg, *args, **kwargs): 232 if not mlog.isEnabledFor(level): 233 return 234 235 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
Resource with lifetime control based on Group
.
Connection( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], master: hat.drivers.modbus.master.Master, log_prefix: str, request_queue_size: int)
async def
read( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, quantity: int) -> list[int] | Error:
async def
write( self, device_id: int, data_type: hat.drivers.modbus.common.DataType, start_address: int, values: list[int]) -> Error | None:
105 async def write(self, 106 device_id: int, 107 data_type: modbus.DataType, 108 start_address: int, 109 values: list[int] 110 ) -> Error | None: 111 self._log(logging.DEBUG, 'enqueuing write request') 112 return await self._request(self._master.write, device_id, data_type, 113 start_address, values)
async def
write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None: