hat.gateway.devices.modbus.master.device
1import asyncio 2import contextlib 3import logging 4 5from hat import aio 6import hat.event.common 7import hat.event.eventer 8 9from hat.gateway.devices.modbus.master import common 10from hat.gateway.devices.modbus.master.connection import connect 11from hat.gateway.devices.modbus.master.eventer_client import EventerClientProxy 12from hat.gateway.devices.modbus.master.remote_device import RemoteDevice 13 14 15mlog = logging.getLogger(__name__) 16 17 18class ModbusMasterDevice(aio.Resource): 19 20 def __init__(self, 21 conf: common.DeviceConf, 22 eventer_client: hat.event.eventer.Client, 23 event_type_prefix: common.EventTypePrefix): 24 self._conf = conf 25 self._eventer_client = EventerClientProxy(eventer_client, 26 event_type_prefix, 27 conf['name']) 28 self._enabled_devices = set() 29 self._status = None 30 self._conn = None 31 self._devices = {} 32 self._readers = {} 33 self._async_group = aio.Group() 34 self._log = common.create_device_logger_adapter(mlog, conf['name']) 35 36 self.async_group.spawn(self._connection_loop) 37 38 @property 39 def async_group(self) -> aio.Group: 40 return self._async_group 41 42 async def process_event(self, event: hat.event.common.Event): 43 try: 44 request = self._eventer_client.process_event(event) 45 46 if isinstance(request, common.RemoteDeviceEnableReq): 47 self._log.debug('received remote device enable request') 48 49 if request.enable: 50 self._enable_remote_device(request.device_id) 51 52 else: 53 await self._disable_remote_device(request.device_id) 54 55 elif isinstance(request, common.RemoteDeviceWriteReq): 56 self._log.debug('received remote device write request') 57 58 if self._conn and self._conn.is_open: 59 self._conn.async_group.spawn( 60 self._write, request.device_id, request.data_name, 61 request.request_id, request.value) 62 63 else: 64 raise ValueError('invalid request') 65 66 except Exception as e: 67 self._log.warning('error processing event: %s', e, exc_info=e) 68 69 async def _connection_loop(self): 70 71 async def cleanup(): 72 if self._conn: 73 await self._conn.async_close() 74 75 with contextlib.suppress(Exception): 76 await self._set_status('DISCONNECTED') 77 78 try: 79 self._log.debug('starting connection loop') 80 81 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 82 self._enabled_devices.update(enabled_devices) 83 84 while True: 85 await self._set_status('CONNECTING') 86 87 try: 88 self._conn = await aio.wait_for( 89 connect(self._conf['connection'], self._conf['name']), 90 self._conf['connection']['connect_timeout']) 91 92 except aio.CancelledWithResultError as e: 93 self._conn = e.result 94 raise 95 96 except Exception as e: 97 self._log.info('connecting error: %s', e, exc_info=e) 98 await self._set_status('DISCONNECTED') 99 await asyncio.sleep( 100 self._conf['connection']['connect_delay']) 101 continue 102 103 await self._set_status('CONNECTED') 104 self._devices = {} 105 self._readers = {} 106 107 self._log.debug('creating remote devices') 108 for device_conf in self._conf['remote_devices']: 109 device = RemoteDevice(device_conf, self._conn, 110 self._conf['name']) 111 self._devices[device.device_id] = device 112 113 if device.device_id in self._enabled_devices: 114 self._enable_remote_device(device.device_id) 115 116 else: 117 await self._notify_response( 118 common.RemoteDeviceStatusRes( 119 device_id=device.device_id, 120 status='DISABLED')) 121 122 await self._conn.wait_closing() 123 await self._conn.async_close() 124 125 await self._set_status('DISCONNECTED') 126 127 except Exception as e: 128 self._log.error('connection loop error: %s', e, exc_info=e) 129 130 finally: 131 self._log.debug('closing connection loop') 132 self.close() 133 await aio.uncancellable(cleanup()) 134 135 async def _notify_response(self, response): 136 await self._eventer_client.write([response]) 137 138 async def _set_status(self, status): 139 if self._status == status: 140 return 141 142 self._log.debug('changing status: %s -> %s', self._status, status) 143 self._status = status 144 await self._notify_response(common.StatusRes(status)) 145 146 def _enable_remote_device(self, device_id): 147 self._log.debug('enabling device %s', device_id) 148 self._enabled_devices.add(device_id) 149 150 device = self._devices.get(device_id) 151 if not device: 152 self._log.debug('device %s is not available', device_id) 153 return 154 if not device.conn.is_open: 155 self._log.debug('connection is not available') 156 return 157 158 reader = self._readers.get(device_id) 159 if reader and reader.is_open: 160 self._log.debug('reader %s is already running', device_id) 161 return 162 163 self._log.debug('creating reader for device %s', device_id) 164 reader = device.create_reader(self._notify_response) 165 self._readers[device.device_id] = reader 166 167 async def _disable_remote_device(self, device_id): 168 self._log.debug('disabling device %s', device_id) 169 self._enabled_devices.discard(device_id) 170 171 reader = self._readers.pop(device_id, None) 172 if not reader: 173 self._log.debug('device reader %s is not available', device_id) 174 return 175 176 await reader.async_close() 177 178 async def _write(self, device_id, data_name, request_id, value): 179 self._log.debug('writing (device_id: %s; data_name: %s; value: %s)', 180 device_id, data_name, value) 181 182 device = self._devices.get(device_id) 183 if not device: 184 self._log.debug('device %s is not available', device_id) 185 return 186 187 response = await device.write(data_name, request_id, value) 188 if response: 189 self._log.debug('writing result: %s', response.result) 190 await self._notify_response(response)
mlog =
<Logger hat.gateway.devices.modbus.master.device (WARNING)>
class
ModbusMasterDevice(hat.aio.group.Resource):
19class ModbusMasterDevice(aio.Resource): 20 21 def __init__(self, 22 conf: common.DeviceConf, 23 eventer_client: hat.event.eventer.Client, 24 event_type_prefix: common.EventTypePrefix): 25 self._conf = conf 26 self._eventer_client = EventerClientProxy(eventer_client, 27 event_type_prefix, 28 conf['name']) 29 self._enabled_devices = set() 30 self._status = None 31 self._conn = None 32 self._devices = {} 33 self._readers = {} 34 self._async_group = aio.Group() 35 self._log = common.create_device_logger_adapter(mlog, conf['name']) 36 37 self.async_group.spawn(self._connection_loop) 38 39 @property 40 def async_group(self) -> aio.Group: 41 return self._async_group 42 43 async def process_event(self, event: hat.event.common.Event): 44 try: 45 request = self._eventer_client.process_event(event) 46 47 if isinstance(request, common.RemoteDeviceEnableReq): 48 self._log.debug('received remote device enable request') 49 50 if request.enable: 51 self._enable_remote_device(request.device_id) 52 53 else: 54 await self._disable_remote_device(request.device_id) 55 56 elif isinstance(request, common.RemoteDeviceWriteReq): 57 self._log.debug('received remote device write request') 58 59 if self._conn and self._conn.is_open: 60 self._conn.async_group.spawn( 61 self._write, request.device_id, request.data_name, 62 request.request_id, request.value) 63 64 else: 65 raise ValueError('invalid request') 66 67 except Exception as e: 68 self._log.warning('error processing event: %s', e, exc_info=e) 69 70 async def _connection_loop(self): 71 72 async def cleanup(): 73 if self._conn: 74 await self._conn.async_close() 75 76 with contextlib.suppress(Exception): 77 await self._set_status('DISCONNECTED') 78 79 try: 80 self._log.debug('starting connection loop') 81 82 enabled_devices = await self._eventer_client.query_enabled_devices() # NOQA 83 self._enabled_devices.update(enabled_devices) 84 85 while True: 86 await self._set_status('CONNECTING') 87 88 try: 89 self._conn = await aio.wait_for( 90 connect(self._conf['connection'], self._conf['name']), 91 self._conf['connection']['connect_timeout']) 92 93 except aio.CancelledWithResultError as e: 94 self._conn = e.result 95 raise 96 97 except Exception as e: 98 self._log.info('connecting error: %s', e, exc_info=e) 99 await self._set_status('DISCONNECTED') 100 await asyncio.sleep( 101 self._conf['connection']['connect_delay']) 102 continue 103 104 await self._set_status('CONNECTED') 105 self._devices = {} 106 self._readers = {} 107 108 self._log.debug('creating remote devices') 109 for device_conf in self._conf['remote_devices']: 110 device = RemoteDevice(device_conf, self._conn, 111 self._conf['name']) 112 self._devices[device.device_id] = device 113 114 if device.device_id in self._enabled_devices: 115 self._enable_remote_device(device.device_id) 116 117 else: 118 await self._notify_response( 119 common.RemoteDeviceStatusRes( 120 device_id=device.device_id, 121 status='DISABLED')) 122 123 await self._conn.wait_closing() 124 await self._conn.async_close() 125 126 await self._set_status('DISCONNECTED') 127 128 except Exception as e: 129 self._log.error('connection loop error: %s', e, exc_info=e) 130 131 finally: 132 self._log.debug('closing connection loop') 133 self.close() 134 await aio.uncancellable(cleanup()) 135 136 async def _notify_response(self, response): 137 await self._eventer_client.write([response]) 138 139 async def _set_status(self, status): 140 if self._status == status: 141 return 142 143 self._log.debug('changing status: %s -> %s', self._status, status) 144 self._status = status 145 await self._notify_response(common.StatusRes(status)) 146 147 def _enable_remote_device(self, device_id): 148 self._log.debug('enabling device %s', device_id) 149 self._enabled_devices.add(device_id) 150 151 device = self._devices.get(device_id) 152 if not device: 153 self._log.debug('device %s is not available', device_id) 154 return 155 if not device.conn.is_open: 156 self._log.debug('connection is not available') 157 return 158 159 reader = self._readers.get(device_id) 160 if reader and reader.is_open: 161 self._log.debug('reader %s is already running', device_id) 162 return 163 164 self._log.debug('creating reader for device %s', device_id) 165 reader = device.create_reader(self._notify_response) 166 self._readers[device.device_id] = reader 167 168 async def _disable_remote_device(self, device_id): 169 self._log.debug('disabling device %s', device_id) 170 self._enabled_devices.discard(device_id) 171 172 reader = self._readers.pop(device_id, None) 173 if not reader: 174 self._log.debug('device reader %s is not available', device_id) 175 return 176 177 await reader.async_close() 178 179 async def _write(self, device_id, data_name, request_id, value): 180 self._log.debug('writing (device_id: %s; data_name: %s; value: %s)', 181 device_id, data_name, value) 182 183 device = self._devices.get(device_id) 184 if not device: 185 self._log.debug('device %s is not available', device_id) 186 return 187 188 response = await device.write(data_name, request_id, value) 189 if response: 190 self._log.debug('writing result: %s', response.result) 191 await self._notify_response(response)
Resource with lifetime control based on Group.
ModbusMasterDevice( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
21 def __init__(self, 22 conf: common.DeviceConf, 23 eventer_client: hat.event.eventer.Client, 24 event_type_prefix: common.EventTypePrefix): 25 self._conf = conf 26 self._eventer_client = EventerClientProxy(eventer_client, 27 event_type_prefix, 28 conf['name']) 29 self._enabled_devices = set() 30 self._status = None 31 self._conn = None 32 self._devices = {} 33 self._readers = {} 34 self._async_group = aio.Group() 35 self._log = common.create_device_logger_adapter(mlog, conf['name']) 36 37 self.async_group.spawn(self._connection_loop)
async def
process_event(self, event: hat.event.common.common.Event):
43 async def process_event(self, event: hat.event.common.Event): 44 try: 45 request = self._eventer_client.process_event(event) 46 47 if isinstance(request, common.RemoteDeviceEnableReq): 48 self._log.debug('received remote device enable request') 49 50 if request.enable: 51 self._enable_remote_device(request.device_id) 52 53 else: 54 await self._disable_remote_device(request.device_id) 55 56 elif isinstance(request, common.RemoteDeviceWriteReq): 57 self._log.debug('received remote device write request') 58 59 if self._conn and self._conn.is_open: 60 self._conn.async_group.spawn( 61 self._write, request.device_id, request.data_name, 62 request.request_id, request.value) 63 64 else: 65 raise ValueError('invalid request') 66 67 except Exception as e: 68 self._log.warning('error processing event: %s', e, exc_info=e)