hat.gateway.devices.modbus.master.device
1from collections.abc import Collection 2import asyncio 3import contextlib 4import logging 5 6from hat import aio 7import hat.event.common 8import hat.event.eventer 9 10from hat.gateway.devices.modbus.master import common 11from hat.gateway.devices.modbus.master.connection import connect 12from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceEnableReq, # NOQA 13 RemoteDeviceWriteReq, # NOQA 14 StatusRes, 15 RemoteDeviceStatusRes, # NOQA 16 RemoteDeviceWriteRes, # NOQA 17 EventerClientProxy) # NOQA 18from hat.gateway.devices.modbus.master.remote_device import RemoteDevice 19 20 21mlog = logging.getLogger(__name__) 22 23 24class ModbusMasterDevice(aio.Resource): 25 26 def __init__(self, 27 conf: common.DeviceConf, 28 eventer_client: hat.event.eventer.Client, 29 event_type_prefix: common.EventTypePrefix): 30 self._conf = conf 31 self._eventer_client = EventerClientProxy(eventer_client, 32 event_type_prefix, 33 conf['name']) 34 self._enabled_devices = set() 35 self._status = None 36 self._conn = None 37 self._devices = {} 38 self._readers = {} 39 self._async_group = aio.Group() 40 self._log = common.create_device_logger_adapter(mlog, conf['name']) 41 42 self.async_group.spawn(self._connection_loop) 43 44 @property 45 def async_group(self) -> aio.Group: 46 return self._async_group 47 48 async def process_events(self, events: Collection[hat.event.common.Event]): 49 try: 50 for request in self._eventer_client.process_events(events): 51 if isinstance(request, RemoteDeviceEnableReq): 52 self._log.debug('received remote device enable request') 53 if request.enable: 54 self._enable_remote_device(request.device_id) 55 else: 56 await self._disable_remote_device(request.device_id) 57 58 elif isinstance(request, RemoteDeviceWriteReq): 59 self._log.debug('received remote device write request') 60 if self._conn and self._conn.is_open: 61 self._conn.async_group.spawn( 62 self._write, request.device_id, request.data_name, 63 request.request_id, request.value) 64 65 else: 66 raise ValueError('invalid request') 67 68 except Exception as e: 69 self._log.error('process events error: %s', e, exc_info=e) 70 self.close() 71 72 async def _connection_loop(self): 73 74 async def cleanup(): 75 if self._conn: 76 await self._conn.async_close() 77 78 with contextlib.suppress(Exception): 79 await self._set_status('DISCONNECTED') 80 81 try: 82 self._log.debug('starting connection loop') 83 84 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 85 self._enabled_devices.update(enabled_devices) 86 87 while True: 88 await self._set_status('CONNECTING') 89 90 try: 91 self._conn = await aio.wait_for( 92 connect(self._conf['connection'], self._conf['name']), 93 self._conf['connection']['connect_timeout']) 94 95 except aio.CancelledWithResultError as e: 96 self._conn = e.result 97 raise 98 99 except Exception as e: 100 self._log.info('connecting error: %s', e, exc_info=e) 101 await self._set_status('DISCONNECTED') 102 await asyncio.sleep( 103 self._conf['connection']['connect_delay']) 104 continue 105 106 await self._set_status('CONNECTED') 107 self._devices = {} 108 self._readers = {} 109 110 self._log.debug('creating remote devices') 111 for device_conf in self._conf['remote_devices']: 112 device = RemoteDevice(device_conf, self._conn, 113 self._conf['name']) 114 self._devices[device.device_id] = device 115 116 if device.device_id in self._enabled_devices: 117 self._enable_remote_device(device.device_id) 118 119 else: 120 await self._notify_response(RemoteDeviceStatusRes( 121 device_id=device.device_id, 122 status='DISABLED')) 123 124 await self._conn.wait_closing() 125 await self._conn.async_close() 126 127 await self._set_status('DISCONNECTED') 128 129 except Exception as e: 130 self._log.error('connection loop error: %s', e, exc_info=e) 131 132 finally: 133 self._log.debug('closing connection loop') 134 self.close() 135 await aio.uncancellable(cleanup()) 136 137 async def _notify_response(self, response): 138 await self._eventer_client.write([response]) 139 140 async def _set_status(self, status): 141 if self._status == status: 142 return 143 144 self._log.debug('changing status: %s -> %s', self._status, status) 145 self._status = status 146 await self._notify_response(StatusRes(status)) 147 148 def _enable_remote_device(self, device_id): 149 self._log.debug('enabling device %s', device_id) 150 self._enabled_devices.add(device_id) 151 152 device = self._devices.get(device_id) 153 if not device: 154 self._log.debug('device %s is not available', device_id) 155 return 156 if not device.conn.is_open: 157 self._log.debug('connection is not available') 158 return 159 160 reader = self._readers.get(device_id) 161 if reader and reader.is_open: 162 self._log.debug('reader %s is already running', device_id) 163 return 164 165 self._log.debug('creating reader for device %s', device_id) 166 reader = device.create_reader(self._notify_response) 167 self._readers[device.device_id] = reader 168 169 async def _disable_remote_device(self, device_id): 170 self._log.debug('disabling device %s', device_id) 171 self._enabled_devices.discard(device_id) 172 173 reader = self._readers.pop(device_id, None) 174 if not reader: 175 self._log.debug('device reader %s is not available', device_id) 176 return 177 178 await reader.async_close() 179 180 async def _write(self, device_id, data_name, request_id, value): 181 self._log.debug('writing (device_id: %s; data_name: %s; value: %s)', 182 device_id, data_name, value) 183 184 device = self._devices.get(device_id) 185 if not device: 186 self._log.debug('device %s is not available', device_id) 187 return 188 189 response = await device.write(data_name, request_id, value) 190 if response: 191 self._log.debug('writing result: %s', response.result) 192 await self._notify_response(response)
mlog =
<Logger hat.gateway.devices.modbus.master.device (WARNING)>
class
ModbusMasterDevice(hat.aio.group.Resource):
25class ModbusMasterDevice(aio.Resource): 26 27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._conf = conf 32 self._eventer_client = EventerClientProxy(eventer_client, 33 event_type_prefix, 34 conf['name']) 35 self._enabled_devices = set() 36 self._status = None 37 self._conn = None 38 self._devices = {} 39 self._readers = {} 40 self._async_group = aio.Group() 41 self._log = common.create_device_logger_adapter(mlog, conf['name']) 42 43 self.async_group.spawn(self._connection_loop) 44 45 @property 46 def async_group(self) -> aio.Group: 47 return self._async_group 48 49 async def process_events(self, events: Collection[hat.event.common.Event]): 50 try: 51 for request in self._eventer_client.process_events(events): 52 if isinstance(request, RemoteDeviceEnableReq): 53 self._log.debug('received remote device enable request') 54 if request.enable: 55 self._enable_remote_device(request.device_id) 56 else: 57 await self._disable_remote_device(request.device_id) 58 59 elif isinstance(request, RemoteDeviceWriteReq): 60 self._log.debug('received remote device write request') 61 if self._conn and self._conn.is_open: 62 self._conn.async_group.spawn( 63 self._write, request.device_id, request.data_name, 64 request.request_id, request.value) 65 66 else: 67 raise ValueError('invalid request') 68 69 except Exception as e: 70 self._log.error('process events error: %s', e, exc_info=e) 71 self.close() 72 73 async def _connection_loop(self): 74 75 async def cleanup(): 76 if self._conn: 77 await self._conn.async_close() 78 79 with contextlib.suppress(Exception): 80 await self._set_status('DISCONNECTED') 81 82 try: 83 self._log.debug('starting connection loop') 84 85 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 86 self._enabled_devices.update(enabled_devices) 87 88 while True: 89 await self._set_status('CONNECTING') 90 91 try: 92 self._conn = await aio.wait_for( 93 connect(self._conf['connection'], self._conf['name']), 94 self._conf['connection']['connect_timeout']) 95 96 except aio.CancelledWithResultError as e: 97 self._conn = e.result 98 raise 99 100 except Exception as e: 101 self._log.info('connecting error: %s', e, exc_info=e) 102 await self._set_status('DISCONNECTED') 103 await asyncio.sleep( 104 self._conf['connection']['connect_delay']) 105 continue 106 107 await self._set_status('CONNECTED') 108 self._devices = {} 109 self._readers = {} 110 111 self._log.debug('creating remote devices') 112 for device_conf in self._conf['remote_devices']: 113 device = RemoteDevice(device_conf, self._conn, 114 self._conf['name']) 115 self._devices[device.device_id] = device 116 117 if device.device_id in self._enabled_devices: 118 self._enable_remote_device(device.device_id) 119 120 else: 121 await self._notify_response(RemoteDeviceStatusRes( 122 device_id=device.device_id, 123 status='DISABLED')) 124 125 await self._conn.wait_closing() 126 await self._conn.async_close() 127 128 await self._set_status('DISCONNECTED') 129 130 except Exception as e: 131 self._log.error('connection loop error: %s', e, exc_info=e) 132 133 finally: 134 self._log.debug('closing connection loop') 135 self.close() 136 await aio.uncancellable(cleanup()) 137 138 async def _notify_response(self, response): 139 await self._eventer_client.write([response]) 140 141 async def _set_status(self, status): 142 if self._status == status: 143 return 144 145 self._log.debug('changing status: %s -> %s', self._status, status) 146 self._status = status 147 await self._notify_response(StatusRes(status)) 148 149 def _enable_remote_device(self, device_id): 150 self._log.debug('enabling device %s', device_id) 151 self._enabled_devices.add(device_id) 152 153 device = self._devices.get(device_id) 154 if not device: 155 self._log.debug('device %s is not available', device_id) 156 return 157 if not device.conn.is_open: 158 self._log.debug('connection is not available') 159 return 160 161 reader = self._readers.get(device_id) 162 if reader and reader.is_open: 163 self._log.debug('reader %s is already running', device_id) 164 return 165 166 self._log.debug('creating reader for device %s', device_id) 167 reader = device.create_reader(self._notify_response) 168 self._readers[device.device_id] = reader 169 170 async def _disable_remote_device(self, device_id): 171 self._log.debug('disabling device %s', device_id) 172 self._enabled_devices.discard(device_id) 173 174 reader = self._readers.pop(device_id, None) 175 if not reader: 176 self._log.debug('device reader %s is not available', device_id) 177 return 178 179 await reader.async_close() 180 181 async def _write(self, device_id, data_name, request_id, value): 182 self._log.debug('writing (device_id: %s; data_name: %s; value: %s)', 183 device_id, data_name, value) 184 185 device = self._devices.get(device_id) 186 if not device: 187 self._log.debug('device %s is not available', device_id) 188 return 189 190 response = await device.write(data_name, request_id, value) 191 if response: 192 self._log.debug('writing result: %s', response.result) 193 await self._notify_response(response)
Resource with lifetime control based on Group.
ModbusMasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._conf = conf 32 self._eventer_client = EventerClientProxy(eventer_client, 33 event_type_prefix, 34 conf['name']) 35 self._enabled_devices = set() 36 self._status = None 37 self._conn = None 38 self._devices = {} 39 self._readers = {} 40 self._async_group = aio.Group() 41 self._log = common.create_device_logger_adapter(mlog, conf['name']) 42 43 self.async_group.spawn(self._connection_loop)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
49 async def process_events(self, events: Collection[hat.event.common.Event]): 50 try: 51 for request in self._eventer_client.process_events(events): 52 if isinstance(request, RemoteDeviceEnableReq): 53 self._log.debug('received remote device enable request') 54 if request.enable: 55 self._enable_remote_device(request.device_id) 56 else: 57 await self._disable_remote_device(request.device_id) 58 59 elif isinstance(request, RemoteDeviceWriteReq): 60 self._log.debug('received remote device write request') 61 if self._conn and self._conn.is_open: 62 self._conn.async_group.spawn( 63 self._write, request.device_id, request.data_name, 64 request.request_id, request.value) 65 66 else: 67 raise ValueError('invalid request') 68 69 except Exception as e: 70 self._log.error('process events error: %s', e, exc_info=e) 71 self.close()