hat.gateway.devices.modbus.master.eventer_client

  1from collections.abc import Iterable
  2import contextlib
  3import logging
  4
  5from hat import aio
  6import hat.event.common
  7import hat.event.eventer
  8
  9from hat.gateway.devices.modbus.master import common
 10
 11
 12mlog = logging.getLogger(__name__)
 13
 14
 15class EventerClientProxy(aio.Resource):
 16
 17    def __init__(self,
 18                 eventer_client: hat.event.eventer.Client,
 19                 event_type_prefix: common.EventTypePrefix,
 20                 name: str):
 21        self._eventer_client = eventer_client
 22        self._event_type_prefix = event_type_prefix
 23        self._log = common.create_device_logger_adapter(mlog, name)
 24
 25    @property
 26    def async_group(self) -> aio.Group:
 27        return self._eventer_client.async_group
 28
 29    def process_event(self, event: hat.event.common.Event) -> common.Request:
 30        return _request_from_event(self._event_type_prefix, event)
 31
 32    async def write(self, responses: Iterable[common.Response]):
 33        events = [_response_to_register_event(self._event_type_prefix, i)
 34                  for i in responses]
 35        await self._eventer_client.register(events)
 36
 37    async def query_enabled_devices(self) -> set[int]:
 38        self._log.debug('querying enabled devices')
 39        enabled_devices = set()
 40
 41        event_type = (*self._event_type_prefix, 'system', 'remote_device',
 42                      '?', 'enable')
 43        params = hat.event.common.QueryLatestParams([event_type])
 44        result = await self._eventer_client.query(params)
 45        self._log.debug('received %s events', len(result.events))
 46
 47        for event in result.events:
 48            if not event.payload or not bool(event.payload.data):
 49                continue
 50
 51            device_id_str = event.type[len(self._event_type_prefix) + 2]
 52            with contextlib.suppress(ValueError):
 53                enabled_devices.add(int(device_id_str))
 54
 55        self._log.debug('detected %s enabled devices', len(enabled_devices))
 56        return enabled_devices
 57
 58
 59def _request_from_event(event_type_prefix, event):
 60    event_type_suffix = event.type[len(event_type_prefix):]
 61
 62    if event_type_suffix[:2] != ('system', 'remote_device'):
 63        raise Exception('unsupported event type')
 64
 65    device_id = int(event_type_suffix[2])
 66
 67    if event_type_suffix[3] == 'enable':
 68        enable = bool(event.payload.data)
 69        return common.RemoteDeviceEnableReq(device_id=device_id,
 70                                            enable=enable)
 71
 72    if event_type_suffix[3] == 'write':
 73        data_name = event_type_suffix[4]
 74        request_id = event.payload.data['request_id']
 75        value = event.payload.data['value']
 76        return common.RemoteDeviceWriteReq(device_id=device_id,
 77                                           data_name=data_name,
 78                                           request_id=request_id,
 79                                           value=value)
 80
 81    raise Exception('unsupported event type')
 82
 83
 84def _response_to_register_event(event_type_prefix, res):
 85    if isinstance(res, common.StatusRes):
 86        event_type = (*event_type_prefix, 'gateway', 'status')
 87        payload = res.status
 88
 89    elif isinstance(res, common.RemoteDeviceStatusRes):
 90        event_type = (*event_type_prefix, 'gateway', 'remote_device',
 91                      str(res.device_id), 'status')
 92        payload = res.status
 93
 94    elif isinstance(res, common.RemoteDeviceReadRes):
 95        event_type = (*event_type_prefix, 'gateway', 'remote_device',
 96                      str(res.device_id), 'read', res.data_name)
 97        payload = {'result': res.result}
 98        if res.value is not None:
 99            payload['value'] = res.value
100        if res.cause is not None:
101            payload['cause'] = res.cause
102
103    elif isinstance(res, common.RemoteDeviceWriteRes):
104        event_type = (*event_type_prefix, 'gateway', 'remote_device',
105                      str(res.device_id), 'write', res.data_name)
106        payload = {'request_id': res.request_id,
107                   'result': res.result}
108
109    else:
110        raise ValueError('invalid response type')
111
112    return hat.event.common.RegisterEvent(
113        type=event_type,
114        source_timestamp=None,
115        payload=hat.event.common.EventPayloadJson(payload))
class EventerClientProxy(hat.aio.group.Resource):
16class EventerClientProxy(aio.Resource):
17
18    def __init__(self,
19                 eventer_client: hat.event.eventer.Client,
20                 event_type_prefix: common.EventTypePrefix,
21                 name: str):
22        self._eventer_client = eventer_client
23        self._event_type_prefix = event_type_prefix
24        self._log = common.create_device_logger_adapter(mlog, name)
25
26    @property
27    def async_group(self) -> aio.Group:
28        return self._eventer_client.async_group
29
30    def process_event(self, event: hat.event.common.Event) -> common.Request:
31        return _request_from_event(self._event_type_prefix, event)
32
33    async def write(self, responses: Iterable[common.Response]):
34        events = [_response_to_register_event(self._event_type_prefix, i)
35                  for i in responses]
36        await self._eventer_client.register(events)
37
38    async def query_enabled_devices(self) -> set[int]:
39        self._log.debug('querying enabled devices')
40        enabled_devices = set()
41
42        event_type = (*self._event_type_prefix, 'system', 'remote_device',
43                      '?', 'enable')
44        params = hat.event.common.QueryLatestParams([event_type])
45        result = await self._eventer_client.query(params)
46        self._log.debug('received %s events', len(result.events))
47
48        for event in result.events:
49            if not event.payload or not bool(event.payload.data):
50                continue
51
52            device_id_str = event.type[len(self._event_type_prefix) + 2]
53            with contextlib.suppress(ValueError):
54                enabled_devices.add(int(device_id_str))
55
56        self._log.debug('detected %s enabled devices', len(enabled_devices))
57        return enabled_devices

Resource with lifetime control based on Group.

EventerClientProxy( eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], name: str)
18    def __init__(self,
19                 eventer_client: hat.event.eventer.Client,
20                 event_type_prefix: common.EventTypePrefix,
21                 name: str):
22        self._eventer_client = eventer_client
23        self._event_type_prefix = event_type_prefix
24        self._log = common.create_device_logger_adapter(mlog, name)
async_group: hat.aio.group.Group
26    @property
27    def async_group(self) -> aio.Group:
28        return self._eventer_client.async_group

Group controlling resource's lifetime.

def process_event( self, event: hat.event.common.common.Event) -> hat.gateway.devices.modbus.master.common.RemoteDeviceEnableReq | hat.gateway.devices.modbus.master.common.RemoteDeviceWriteReq:
30    def process_event(self, event: hat.event.common.Event) -> common.Request:
31        return _request_from_event(self._event_type_prefix, event)
33    async def write(self, responses: Iterable[common.Response]):
34        events = [_response_to_register_event(self._event_type_prefix, i)
35                  for i in responses]
36        await self._eventer_client.register(events)
async def query_enabled_devices(self) -> set[int]:
38    async def query_enabled_devices(self) -> set[int]:
39        self._log.debug('querying enabled devices')
40        enabled_devices = set()
41
42        event_type = (*self._event_type_prefix, 'system', 'remote_device',
43                      '?', 'enable')
44        params = hat.event.common.QueryLatestParams([event_type])
45        result = await self._eventer_client.query(params)
46        self._log.debug('received %s events', len(result.events))
47
48        for event in result.events:
49            if not event.payload or not bool(event.payload.data):
50                continue
51
52            device_id_str = event.type[len(self._event_type_prefix) + 2]
53            with contextlib.suppress(ValueError):
54                enabled_devices.add(int(device_id_str))
55
56        self._log.debug('detected %s enabled devices', len(enabled_devices))
57        return enabled_devices