hat.gateway.devices.modbus.master.device
1from collections.abc import Collection 2import asyncio 3import contextlib 4import logging 5 6from hat import aio 7import hat.event.common 8import hat.event.eventer 9 10from hat.gateway import common 11from hat.gateway.devices.modbus.master.connection import connect 12from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceEnableReq, # NOQA 13 RemoteDeviceWriteReq, # NOQA 14 StatusRes, 15 RemoteDeviceStatusRes, # NOQA 16 RemoteDeviceWriteRes, # NOQA 17 EventerClientProxy) # NOQA 18from hat.gateway.devices.modbus.master.remote_device import RemoteDevice 19 20 21mlog = logging.getLogger(__name__) 22 23 24class ModbusMasterDevice(aio.Resource): 25 26 def __init__(self, 27 conf: common.DeviceConf, 28 eventer_client: hat.event.eventer.Client, 29 event_type_prefix: common.EventTypePrefix): 30 self._conf = conf 31 self._log_prefix = f"gateway device {conf['name']}" 32 self._eventer_client = EventerClientProxy(eventer_client, 33 event_type_prefix, 34 self._log_prefix) 35 self._enabled_devices = set() 36 self._status = None 37 self._conn = None 38 self._devices = {} 39 self._readers = {} 40 self._async_group = aio.Group() 41 42 self.async_group.spawn(self._connection_loop) 43 44 @property 45 def async_group(self) -> aio.Group: 46 return self._async_group 47 48 async def process_events(self, events: Collection[hat.event.common.Event]): 49 try: 50 for request in self._eventer_client.process_events(events): 51 if isinstance(request, RemoteDeviceEnableReq): 52 self._log(logging.DEBUG, 53 'received remote device enable request') 54 if request.enable: 55 self._enable_remote_device(request.device_id) 56 else: 57 await self._disable_remote_device(request.device_id) 58 59 elif isinstance(request, RemoteDeviceWriteReq): 60 self._log(logging.DEBUG, 61 'received remote device write request') 62 if self._conn and self._conn.is_open: 63 self._conn.async_group.spawn( 64 self._write, request.device_id, request.data_name, 65 request.request_id, request.value) 66 67 else: 68 raise ValueError('invalid request') 69 70 except Exception as e: 71 self._log(logging.ERROR, 'process events error: %s', e, exc_info=e) 72 self.close() 73 74 async def _connection_loop(self): 75 76 async def cleanup(): 77 if self._conn: 78 await self._conn.async_close() 79 80 with contextlib.suppress(Exception): 81 await self._set_status('DISCONNECTED') 82 83 try: 84 self._log(logging.DEBUG, 'starting connection loop') 85 86 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 87 self._enabled_devices.update(enabled_devices) 88 89 while True: 90 await self._set_status('CONNECTING') 91 92 try: 93 self._conn = await aio.wait_for( 94 connect(self._conf['connection'], 95 self._log_prefix), 96 self._conf['connection']['connect_timeout']) 97 98 except aio.CancelledWithResultError as e: 99 self._conn = e.result 100 raise 101 102 except Exception as e: 103 self._log(logging.INFO, 'connecting error: %s', e, 104 exc_info=e) 105 await self._set_status('DISCONNECTED') 106 await asyncio.sleep( 107 self._conf['connection']['connect_delay']) 108 continue 109 110 await self._set_status('CONNECTED') 111 self._devices = {} 112 self._readers = {} 113 114 self._log(logging.DEBUG, 'creating remote devices') 115 for device_conf in self._conf['remote_devices']: 116 device = RemoteDevice(device_conf, self._conn, 117 self._log_prefix) 118 self._devices[device.device_id] = device 119 120 if device.device_id in self._enabled_devices: 121 self._enable_remote_device(device.device_id) 122 123 else: 124 await self._notify_response(RemoteDeviceStatusRes( 125 device_id=device.device_id, 126 status='DISABLED')) 127 128 await self._conn.wait_closing() 129 await self._conn.async_close() 130 131 await self._set_status('DISCONNECTED') 132 133 except Exception as e: 134 self._log(logging.ERROR, 'connection loop error: %s', e, 135 exc_info=e) 136 137 finally: 138 self._log(logging.DEBUG, 'closing connection loop') 139 self.close() 140 await aio.uncancellable(cleanup()) 141 142 async def _notify_response(self, response): 143 await self._eventer_client.write([response]) 144 145 async def _set_status(self, status): 146 if self._status == status: 147 return 148 149 self._log(logging.DEBUG, 'changing status: %s -> %s', 150 self._status, status) 151 self._status = status 152 await self._notify_response(StatusRes(status)) 153 154 def _enable_remote_device(self, device_id): 155 self._log(logging.DEBUG, 'enabling device %s', device_id) 156 self._enabled_devices.add(device_id) 157 158 device = self._devices.get(device_id) 159 if not device: 160 self._log(logging.DEBUG, 'device %s is not available', device_id) 161 return 162 if not device.conn.is_open: 163 self._log(logging.DEBUG, 'connection is not available') 164 return 165 166 reader = self._readers.get(device_id) 167 if reader and reader.is_open: 168 self._log(logging.DEBUG, 'reader %s is already running', device_id) 169 return 170 171 self._log(logging.DEBUG, 'creating reader for device %s', device_id) 172 reader = device.create_reader(self._notify_response) 173 self._readers[device.device_id] = reader 174 175 async def _disable_remote_device(self, device_id): 176 self._log(logging.DEBUG, 'disabling device %s', device_id) 177 self._enabled_devices.discard(device_id) 178 179 reader = self._readers.pop(device_id, None) 180 if not reader: 181 self._log(logging.DEBUG, 'device reader %s is not available', 182 device_id) 183 return 184 185 await reader.async_close() 186 187 async def _write(self, device_id, data_name, request_id, value): 188 self._log(logging.DEBUG, 189 'writing (device_id: %s; data_name: %s; value: %s)', 190 device_id, data_name, value) 191 192 device = self._devices.get(device_id) 193 if not device: 194 self._log(logging.DEBUG, 'device %s is not available', device_id) 195 return 196 197 response = await device.write(data_name, request_id, value) 198 if response: 199 self._log(logging.DEBUG, 'writing result: %s', response.result) 200 await self._notify_response(response) 201 202 def _log(self, level, msg, *args, **kwargs): 203 if not mlog.isEnabledFor(level): 204 return 205 206 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
mlog =
<Logger hat.gateway.devices.modbus.master.device (WARNING)>
class
ModbusMasterDevice(hat.aio.group.Resource):
25class ModbusMasterDevice(aio.Resource): 26 27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._conf = conf 32 self._log_prefix = f"gateway device {conf['name']}" 33 self._eventer_client = EventerClientProxy(eventer_client, 34 event_type_prefix, 35 self._log_prefix) 36 self._enabled_devices = set() 37 self._status = None 38 self._conn = None 39 self._devices = {} 40 self._readers = {} 41 self._async_group = aio.Group() 42 43 self.async_group.spawn(self._connection_loop) 44 45 @property 46 def async_group(self) -> aio.Group: 47 return self._async_group 48 49 async def process_events(self, events: Collection[hat.event.common.Event]): 50 try: 51 for request in self._eventer_client.process_events(events): 52 if isinstance(request, RemoteDeviceEnableReq): 53 self._log(logging.DEBUG, 54 'received remote device enable request') 55 if request.enable: 56 self._enable_remote_device(request.device_id) 57 else: 58 await self._disable_remote_device(request.device_id) 59 60 elif isinstance(request, RemoteDeviceWriteReq): 61 self._log(logging.DEBUG, 62 'received remote device write request') 63 if self._conn and self._conn.is_open: 64 self._conn.async_group.spawn( 65 self._write, request.device_id, request.data_name, 66 request.request_id, request.value) 67 68 else: 69 raise ValueError('invalid request') 70 71 except Exception as e: 72 self._log(logging.ERROR, 'process events error: %s', e, exc_info=e) 73 self.close() 74 75 async def _connection_loop(self): 76 77 async def cleanup(): 78 if self._conn: 79 await self._conn.async_close() 80 81 with contextlib.suppress(Exception): 82 await self._set_status('DISCONNECTED') 83 84 try: 85 self._log(logging.DEBUG, 'starting connection loop') 86 87 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 88 self._enabled_devices.update(enabled_devices) 89 90 while True: 91 await self._set_status('CONNECTING') 92 93 try: 94 self._conn = await aio.wait_for( 95 connect(self._conf['connection'], 96 self._log_prefix), 97 self._conf['connection']['connect_timeout']) 98 99 except aio.CancelledWithResultError as e: 100 self._conn = e.result 101 raise 102 103 except Exception as e: 104 self._log(logging.INFO, 'connecting error: %s', e, 105 exc_info=e) 106 await self._set_status('DISCONNECTED') 107 await asyncio.sleep( 108 self._conf['connection']['connect_delay']) 109 continue 110 111 await self._set_status('CONNECTED') 112 self._devices = {} 113 self._readers = {} 114 115 self._log(logging.DEBUG, 'creating remote devices') 116 for device_conf in self._conf['remote_devices']: 117 device = RemoteDevice(device_conf, self._conn, 118 self._log_prefix) 119 self._devices[device.device_id] = device 120 121 if device.device_id in self._enabled_devices: 122 self._enable_remote_device(device.device_id) 123 124 else: 125 await self._notify_response(RemoteDeviceStatusRes( 126 device_id=device.device_id, 127 status='DISABLED')) 128 129 await self._conn.wait_closing() 130 await self._conn.async_close() 131 132 await self._set_status('DISCONNECTED') 133 134 except Exception as e: 135 self._log(logging.ERROR, 'connection loop error: %s', e, 136 exc_info=e) 137 138 finally: 139 self._log(logging.DEBUG, 'closing connection loop') 140 self.close() 141 await aio.uncancellable(cleanup()) 142 143 async def _notify_response(self, response): 144 await self._eventer_client.write([response]) 145 146 async def _set_status(self, status): 147 if self._status == status: 148 return 149 150 self._log(logging.DEBUG, 'changing status: %s -> %s', 151 self._status, status) 152 self._status = status 153 await self._notify_response(StatusRes(status)) 154 155 def _enable_remote_device(self, device_id): 156 self._log(logging.DEBUG, 'enabling device %s', device_id) 157 self._enabled_devices.add(device_id) 158 159 device = self._devices.get(device_id) 160 if not device: 161 self._log(logging.DEBUG, 'device %s is not available', device_id) 162 return 163 if not device.conn.is_open: 164 self._log(logging.DEBUG, 'connection is not available') 165 return 166 167 reader = self._readers.get(device_id) 168 if reader and reader.is_open: 169 self._log(logging.DEBUG, 'reader %s is already running', device_id) 170 return 171 172 self._log(logging.DEBUG, 'creating reader for device %s', device_id) 173 reader = device.create_reader(self._notify_response) 174 self._readers[device.device_id] = reader 175 176 async def _disable_remote_device(self, device_id): 177 self._log(logging.DEBUG, 'disabling device %s', device_id) 178 self._enabled_devices.discard(device_id) 179 180 reader = self._readers.pop(device_id, None) 181 if not reader: 182 self._log(logging.DEBUG, 'device reader %s is not available', 183 device_id) 184 return 185 186 await reader.async_close() 187 188 async def _write(self, device_id, data_name, request_id, value): 189 self._log(logging.DEBUG, 190 'writing (device_id: %s; data_name: %s; value: %s)', 191 device_id, data_name, value) 192 193 device = self._devices.get(device_id) 194 if not device: 195 self._log(logging.DEBUG, 'device %s is not available', device_id) 196 return 197 198 response = await device.write(data_name, request_id, value) 199 if response: 200 self._log(logging.DEBUG, 'writing result: %s', response.result) 201 await self._notify_response(response) 202 203 def _log(self, level, msg, *args, **kwargs): 204 if not mlog.isEnabledFor(level): 205 return 206 207 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
Resource with lifetime control based on Group
.
ModbusMasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._conf = conf 32 self._log_prefix = f"gateway device {conf['name']}" 33 self._eventer_client = EventerClientProxy(eventer_client, 34 event_type_prefix, 35 self._log_prefix) 36 self._enabled_devices = set() 37 self._status = None 38 self._conn = None 39 self._devices = {} 40 self._readers = {} 41 self._async_group = aio.Group() 42 43 self.async_group.spawn(self._connection_loop)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
49 async def process_events(self, events: Collection[hat.event.common.Event]): 50 try: 51 for request in self._eventer_client.process_events(events): 52 if isinstance(request, RemoteDeviceEnableReq): 53 self._log(logging.DEBUG, 54 'received remote device enable request') 55 if request.enable: 56 self._enable_remote_device(request.device_id) 57 else: 58 await self._disable_remote_device(request.device_id) 59 60 elif isinstance(request, RemoteDeviceWriteReq): 61 self._log(logging.DEBUG, 62 'received remote device write request') 63 if self._conn and self._conn.is_open: 64 self._conn.async_group.spawn( 65 self._write, request.device_id, request.data_name, 66 request.request_id, request.value) 67 68 else: 69 raise ValueError('invalid request') 70 71 except Exception as e: 72 self._log(logging.ERROR, 'process events error: %s', e, exc_info=e) 73 self.close()