hat.gateway.devices.modbus.master.eventer_client
1from collections.abc import Collection, Iterable 2import contextlib 3import logging 4import typing 5 6from hat import aio 7import hat.event.common 8import hat.event.eventer 9 10from hat.gateway import common 11 12 13mlog = logging.getLogger(__name__) 14 15 16class RemoteDeviceEnableReq(typing.NamedTuple): 17 device_id: int 18 enable: bool 19 20 21class RemoteDeviceWriteReq(typing.NamedTuple): 22 device_id: int 23 data_name: str 24 request_id: str 25 value: int 26 27 28Request: typing.TypeAlias = RemoteDeviceEnableReq | RemoteDeviceWriteReq 29 30 31class StatusRes(typing.NamedTuple): 32 status: str 33 34 35class RemoteDeviceStatusRes(typing.NamedTuple): 36 device_id: int 37 status: str 38 39 40class RemoteDeviceReadRes(typing.NamedTuple): 41 device_id: int 42 data_name: str 43 result: str 44 value: int | None 45 cause: str | None 46 47 48class RemoteDeviceWriteRes(typing.NamedTuple): 49 device_id: int 50 data_name: str 51 request_id: str 52 result: str 53 54 55Response: typing.TypeAlias = (StatusRes | 56 RemoteDeviceStatusRes | 57 RemoteDeviceReadRes | 58 RemoteDeviceWriteRes) 59 60 61class EventerClientProxy(aio.Resource): 62 63 def __init__(self, 64 eventer_client: hat.event.eventer.Client, 65 event_type_prefix: common.EventTypePrefix, 66 log_prefix: str): 67 self._eventer_client = eventer_client 68 self._event_type_prefix = event_type_prefix 69 self._log_prefix = log_prefix 70 71 @property 72 def async_group(self) -> aio.Group: 73 return self._eventer_client.async_group 74 75 def process_events(self, 76 events: Collection[hat.event.common.Event] 77 ) -> Iterable[Request]: 78 self._log(logging.DEBUG, 'received %s events', len(events)) 79 for event in events: 80 try: 81 yield _request_from_event(self._event_type_prefix, event) 82 83 except Exception as e: 84 self._log(logging.INFO, 'received invalid event: %s', e, 85 exc_info=e) 86 87 async def write(self, responses: Iterable[Response]): 88 events = [_response_to_register_event(self._event_type_prefix, i) 89 for i in responses] 90 await self._eventer_client.register(events) 91 92 async def query_enabled_devices(self) -> set[int]: 93 self._log(logging.DEBUG, 'querying enabled devices') 94 enabled_devices = set() 95 96 event_type = (*self._event_type_prefix, 'system', 'remote_device', 97 '?', 'enable') 98 params = hat.event.common.QueryLatestParams([event_type]) 99 result = await self._eventer_client.query(params) 100 self._log(logging.DEBUG, 'received %s events', len(result.events)) 101 102 for event in result.events: 103 if not event.payload or not bool(event.payload.data): 104 continue 105 106 device_id_str = event.type[len(self._event_type_prefix) + 2] 107 with contextlib.suppress(ValueError): 108 enabled_devices.add(int(device_id_str)) 109 110 self._log(logging.DEBUG, 'detected %s enabled devices', 111 len(enabled_devices)) 112 return enabled_devices 113 114 def _log(self, level, msg, *args, **kwargs): 115 if not mlog.isEnabledFor(level): 116 return 117 118 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs) 119 120 121def _request_from_event(event_type_prefix, event): 122 event_type_suffix = event.type[len(event_type_prefix):] 123 124 if event_type_suffix[:2] != ('system', 'remote_device'): 125 raise Exception('unsupported event type') 126 127 device_id = int(event_type_suffix[2]) 128 129 if event_type_suffix[3] == 'enable': 130 enable = bool(event.payload.data) 131 return RemoteDeviceEnableReq(device_id=device_id, 132 enable=enable) 133 134 if event_type_suffix[3] == 'write': 135 data_name = event_type_suffix[4] 136 request_id = event.payload.data['request_id'] 137 value = event.payload.data['value'] 138 return RemoteDeviceWriteReq(device_id=device_id, 139 data_name=data_name, 140 request_id=request_id, 141 value=value) 142 143 raise Exception('unsupported event type') 144 145 146def _response_to_register_event(event_type_prefix, res): 147 if isinstance(res, StatusRes): 148 event_type = (*event_type_prefix, 'gateway', 'status') 149 payload = res.status 150 151 elif isinstance(res, RemoteDeviceStatusRes): 152 event_type = (*event_type_prefix, 'gateway', 'remote_device', 153 str(res.device_id), 'status') 154 payload = res.status 155 156 elif isinstance(res, RemoteDeviceReadRes): 157 event_type = (*event_type_prefix, 'gateway', 'remote_device', 158 str(res.device_id), 'read', res.data_name) 159 payload = {'result': res.result} 160 if res.value is not None: 161 payload['value'] = res.value 162 if res.cause is not None: 163 payload['cause'] = res.cause 164 165 elif isinstance(res, RemoteDeviceWriteRes): 166 event_type = (*event_type_prefix, 'gateway', 'remote_device', 167 str(res.device_id), 'write', res.data_name) 168 payload = {'request_id': res.request_id, 169 'result': res.result} 170 171 else: 172 raise ValueError('invalid response type') 173 174 return hat.event.common.RegisterEvent( 175 type=event_type, 176 source_timestamp=None, 177 payload=hat.event.common.EventPayloadJson(payload))
mlog =
<Logger hat.gateway.devices.modbus.master.eventer_client (WARNING)>
class
RemoteDeviceEnableReq(typing.NamedTuple):
RemoteDeviceEnableReq(device_id, enable)
class
RemoteDeviceWriteReq(typing.NamedTuple):
22class RemoteDeviceWriteReq(typing.NamedTuple): 23 device_id: int 24 data_name: str 25 request_id: str 26 value: int
RemoteDeviceWriteReq(device_id, data_name, request_id, value)
class
StatusRes(typing.NamedTuple):
StatusRes(status,)
class
RemoteDeviceStatusRes(typing.NamedTuple):
RemoteDeviceStatusRes(device_id, status)
class
RemoteDeviceReadRes(typing.NamedTuple):
41class RemoteDeviceReadRes(typing.NamedTuple): 42 device_id: int 43 data_name: str 44 result: str 45 value: int | None 46 cause: str | None
RemoteDeviceReadRes(device_id, data_name, result, value, cause)
class
RemoteDeviceWriteRes(typing.NamedTuple):
49class RemoteDeviceWriteRes(typing.NamedTuple): 50 device_id: int 51 data_name: str 52 request_id: str 53 result: str
RemoteDeviceWriteRes(device_id, data_name, request_id, result)
Response: TypeAlias =
StatusRes | RemoteDeviceStatusRes | RemoteDeviceReadRes | RemoteDeviceWriteRes
class
EventerClientProxy(hat.aio.group.Resource):
62class EventerClientProxy(aio.Resource): 63 64 def __init__(self, 65 eventer_client: hat.event.eventer.Client, 66 event_type_prefix: common.EventTypePrefix, 67 log_prefix: str): 68 self._eventer_client = eventer_client 69 self._event_type_prefix = event_type_prefix 70 self._log_prefix = log_prefix 71 72 @property 73 def async_group(self) -> aio.Group: 74 return self._eventer_client.async_group 75 76 def process_events(self, 77 events: Collection[hat.event.common.Event] 78 ) -> Iterable[Request]: 79 self._log(logging.DEBUG, 'received %s events', len(events)) 80 for event in events: 81 try: 82 yield _request_from_event(self._event_type_prefix, event) 83 84 except Exception as e: 85 self._log(logging.INFO, 'received invalid event: %s', e, 86 exc_info=e) 87 88 async def write(self, responses: Iterable[Response]): 89 events = [_response_to_register_event(self._event_type_prefix, i) 90 for i in responses] 91 await self._eventer_client.register(events) 92 93 async def query_enabled_devices(self) -> set[int]: 94 self._log(logging.DEBUG, 'querying enabled devices') 95 enabled_devices = set() 96 97 event_type = (*self._event_type_prefix, 'system', 'remote_device', 98 '?', 'enable') 99 params = hat.event.common.QueryLatestParams([event_type]) 100 result = await self._eventer_client.query(params) 101 self._log(logging.DEBUG, 'received %s events', len(result.events)) 102 103 for event in result.events: 104 if not event.payload or not bool(event.payload.data): 105 continue 106 107 device_id_str = event.type[len(self._event_type_prefix) + 2] 108 with contextlib.suppress(ValueError): 109 enabled_devices.add(int(device_id_str)) 110 111 self._log(logging.DEBUG, 'detected %s enabled devices', 112 len(enabled_devices)) 113 return enabled_devices 114 115 def _log(self, level, msg, *args, **kwargs): 116 if not mlog.isEnabledFor(level): 117 return 118 119 mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
Resource with lifetime control based on Group
.
EventerClientProxy( eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], log_prefix: str)
def
process_events( self, events: Collection[hat.event.common.common.Event]) -> Iterable[RemoteDeviceEnableReq | RemoteDeviceWriteReq]:
76 def process_events(self, 77 events: Collection[hat.event.common.Event] 78 ) -> Iterable[Request]: 79 self._log(logging.DEBUG, 'received %s events', len(events)) 80 for event in events: 81 try: 82 yield _request_from_event(self._event_type_prefix, event) 83 84 except Exception as e: 85 self._log(logging.INFO, 'received invalid event: %s', e, 86 exc_info=e)
async def
write( self, responses: Iterable[StatusRes | RemoteDeviceStatusRes | RemoteDeviceReadRes | RemoteDeviceWriteRes]):
async def
query_enabled_devices(self) -> set[int]:
93 async def query_enabled_devices(self) -> set[int]: 94 self._log(logging.DEBUG, 'querying enabled devices') 95 enabled_devices = set() 96 97 event_type = (*self._event_type_prefix, 'system', 'remote_device', 98 '?', 'enable') 99 params = hat.event.common.QueryLatestParams([event_type]) 100 result = await self._eventer_client.query(params) 101 self._log(logging.DEBUG, 'received %s events', len(result.events)) 102 103 for event in result.events: 104 if not event.payload or not bool(event.payload.data): 105 continue 106 107 device_id_str = event.type[len(self._event_type_prefix) + 2] 108 with contextlib.suppress(ValueError): 109 enabled_devices.add(int(device_id_str)) 110 111 self._log(logging.DEBUG, 'detected %s enabled devices', 112 len(enabled_devices)) 113 return enabled_devices