hat.gateway.devices.modbus.master.eventer_client
1from collections.abc import Iterable 2import contextlib 3import logging 4 5from hat import aio 6import hat.event.common 7import hat.event.eventer 8 9from hat.gateway.devices.modbus.master import common 10 11 12mlog = logging.getLogger(__name__) 13 14 15class EventerClientProxy(aio.Resource): 16 17 def __init__(self, 18 eventer_client: hat.event.eventer.Client, 19 event_type_prefix: common.EventTypePrefix, 20 name: str): 21 self._eventer_client = eventer_client 22 self._event_type_prefix = event_type_prefix 23 self._log = common.create_device_logger_adapter(mlog, name) 24 25 @property 26 def async_group(self) -> aio.Group: 27 return self._eventer_client.async_group 28 29 def process_event(self, event: hat.event.common.Event) -> common.Request: 30 return _request_from_event(self._event_type_prefix, event) 31 32 async def write(self, responses: Iterable[common.Response]): 33 events = [_response_to_register_event(self._event_type_prefix, i) 34 for i in responses] 35 await self._eventer_client.register(events) 36 37 async def query_enabled_devices(self) -> set[int]: 38 self._log.debug('querying enabled devices') 39 enabled_devices = set() 40 41 event_type = (*self._event_type_prefix, 'system', 'remote_device', 42 '?', 'enable') 43 params = hat.event.common.QueryLatestParams([event_type]) 44 result = await self._eventer_client.query(params) 45 self._log.debug('received %s events', len(result.events)) 46 47 for event in result.events: 48 if not event.payload or not bool(event.payload.data): 49 continue 50 51 device_id_str = event.type[len(self._event_type_prefix) + 2] 52 with contextlib.suppress(ValueError): 53 enabled_devices.add(int(device_id_str)) 54 55 self._log.debug('detected %s enabled devices', len(enabled_devices)) 56 return enabled_devices 57 58 59def _request_from_event(event_type_prefix, event): 60 event_type_suffix = event.type[len(event_type_prefix):] 61 62 if event_type_suffix[:2] != ('system', 'remote_device'): 63 raise Exception('unsupported event type') 64 65 device_id = int(event_type_suffix[2]) 66 67 if event_type_suffix[3] == 'enable': 68 enable = bool(event.payload.data) 69 return common.RemoteDeviceEnableReq(device_id=device_id, 70 enable=enable) 71 72 if event_type_suffix[3] == 'write': 73 data_name = event_type_suffix[4] 74 request_id = event.payload.data['request_id'] 75 value = event.payload.data['value'] 76 return common.RemoteDeviceWriteReq(device_id=device_id, 77 data_name=data_name, 78 request_id=request_id, 79 value=value) 80 81 raise Exception('unsupported event type') 82 83 84def _response_to_register_event(event_type_prefix, res): 85 if isinstance(res, common.StatusRes): 86 event_type = (*event_type_prefix, 'gateway', 'status') 87 payload = res.status 88 89 elif isinstance(res, common.RemoteDeviceStatusRes): 90 event_type = (*event_type_prefix, 'gateway', 'remote_device', 91 str(res.device_id), 'status') 92 payload = res.status 93 94 elif isinstance(res, common.RemoteDeviceReadRes): 95 event_type = (*event_type_prefix, 'gateway', 'remote_device', 96 str(res.device_id), 'read', res.data_name) 97 payload = {'result': res.result} 98 if res.value is not None: 99 payload['value'] = res.value 100 if res.cause is not None: 101 payload['cause'] = res.cause 102 103 elif isinstance(res, common.RemoteDeviceWriteRes): 104 event_type = (*event_type_prefix, 'gateway', 'remote_device', 105 str(res.device_id), 'write', res.data_name) 106 payload = {'request_id': res.request_id, 107 'result': res.result} 108 109 else: 110 raise ValueError('invalid response type') 111 112 return hat.event.common.RegisterEvent( 113 type=event_type, 114 source_timestamp=None, 115 payload=hat.event.common.EventPayloadJson(payload))
mlog =
<Logger hat.gateway.devices.modbus.master.eventer_client (WARNING)>
class
EventerClientProxy(hat.aio.group.Resource):
16class EventerClientProxy(aio.Resource): 17 18 def __init__(self, 19 eventer_client: hat.event.eventer.Client, 20 event_type_prefix: common.EventTypePrefix, 21 name: str): 22 self._eventer_client = eventer_client 23 self._event_type_prefix = event_type_prefix 24 self._log = common.create_device_logger_adapter(mlog, name) 25 26 @property 27 def async_group(self) -> aio.Group: 28 return self._eventer_client.async_group 29 30 def process_event(self, event: hat.event.common.Event) -> common.Request: 31 return _request_from_event(self._event_type_prefix, event) 32 33 async def write(self, responses: Iterable[common.Response]): 34 events = [_response_to_register_event(self._event_type_prefix, i) 35 for i in responses] 36 await self._eventer_client.register(events) 37 38 async def query_enabled_devices(self) -> set[int]: 39 self._log.debug('querying enabled devices') 40 enabled_devices = set() 41 42 event_type = (*self._event_type_prefix, 'system', 'remote_device', 43 '?', 'enable') 44 params = hat.event.common.QueryLatestParams([event_type]) 45 result = await self._eventer_client.query(params) 46 self._log.debug('received %s events', len(result.events)) 47 48 for event in result.events: 49 if not event.payload or not bool(event.payload.data): 50 continue 51 52 device_id_str = event.type[len(self._event_type_prefix) + 2] 53 with contextlib.suppress(ValueError): 54 enabled_devices.add(int(device_id_str)) 55 56 self._log.debug('detected %s enabled devices', len(enabled_devices)) 57 return enabled_devices
Resource with lifetime control based on Group.
EventerClientProxy( eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], name: str)
def
process_event( self, event: hat.event.common.common.Event) -> hat.gateway.devices.modbus.master.common.RemoteDeviceEnableReq | hat.gateway.devices.modbus.master.common.RemoteDeviceWriteReq:
async def
write( self, responses: Iterable[hat.gateway.devices.modbus.master.common.StatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceStatusRes | hat.gateway.devices.modbus.master.common.RemoteDeviceReadRes | hat.gateway.devices.modbus.master.common.RemoteDeviceWriteRes]):
async def
query_enabled_devices(self) -> set[int]:
38 async def query_enabled_devices(self) -> set[int]: 39 self._log.debug('querying enabled devices') 40 enabled_devices = set() 41 42 event_type = (*self._event_type_prefix, 'system', 'remote_device', 43 '?', 'enable') 44 params = hat.event.common.QueryLatestParams([event_type]) 45 result = await self._eventer_client.query(params) 46 self._log.debug('received %s events', len(result.events)) 47 48 for event in result.events: 49 if not event.payload or not bool(event.payload.data): 50 continue 51 52 device_id_str = event.type[len(self._event_type_prefix) + 2] 53 with contextlib.suppress(ValueError): 54 enabled_devices.add(int(device_id_str)) 55 56 self._log.debug('detected %s enabled devices', len(enabled_devices)) 57 return enabled_devices