hat.gateway.devices.modbus.master.eventer_client

  1from collections.abc import Collection, Iterable
  2import contextlib
  3import logging
  4import typing
  5
  6from hat import aio
  7import hat.event.common
  8import hat.event.eventer
  9
 10from hat.gateway import common
 11
 12
 13mlog = logging.getLogger(__name__)
 14
 15
 16class RemoteDeviceEnableReq(typing.NamedTuple):
 17    device_id: int
 18    enable: bool
 19
 20
 21class RemoteDeviceWriteReq(typing.NamedTuple):
 22    device_id: int
 23    data_name: str
 24    request_id: str
 25    value: int
 26
 27
 28Request: typing.TypeAlias = RemoteDeviceEnableReq | RemoteDeviceWriteReq
 29
 30
 31class StatusRes(typing.NamedTuple):
 32    status: str
 33
 34
 35class RemoteDeviceStatusRes(typing.NamedTuple):
 36    device_id: int
 37    status: str
 38
 39
 40class RemoteDeviceReadRes(typing.NamedTuple):
 41    device_id: int
 42    data_name: str
 43    result: str
 44    value: int | None
 45    cause: str | None
 46
 47
 48class RemoteDeviceWriteRes(typing.NamedTuple):
 49    device_id: int
 50    data_name: str
 51    request_id: str
 52    result: str
 53
 54
 55Response: typing.TypeAlias = (StatusRes |
 56                              RemoteDeviceStatusRes |
 57                              RemoteDeviceReadRes |
 58                              RemoteDeviceWriteRes)
 59
 60
 61class EventerClientProxy(aio.Resource):
 62
 63    def __init__(self,
 64                 eventer_client: hat.event.eventer.Client,
 65                 event_type_prefix: common.EventTypePrefix,
 66                 log_prefix: str):
 67        self._eventer_client = eventer_client
 68        self._event_type_prefix = event_type_prefix
 69        self._log_prefix = log_prefix
 70
 71    @property
 72    def async_group(self) -> aio.Group:
 73        return self._eventer_client.async_group
 74
 75    def process_events(self,
 76                       events: Collection[hat.event.common.Event]
 77                       ) -> Iterable[Request]:
 78        self._log(logging.DEBUG, 'received %s events', len(events))
 79        for event in events:
 80            try:
 81                yield _request_from_event(self._event_type_prefix, event)
 82
 83            except Exception as e:
 84                self._log(logging.INFO, 'received invalid event: %s', e,
 85                          exc_info=e)
 86
 87    async def write(self, responses: Iterable[Response]):
 88        events = [_response_to_register_event(self._event_type_prefix, i)
 89                  for i in responses]
 90        await self._eventer_client.register(events)
 91
 92    async def query_enabled_devices(self) -> set[int]:
 93        self._log(logging.DEBUG, 'querying enabled devices')
 94        enabled_devices = set()
 95
 96        event_type = (*self._event_type_prefix, 'system', 'remote_device',
 97                      '?', 'enable')
 98        params = hat.event.common.QueryLatestParams([event_type])
 99        result = await self._eventer_client.query(params)
100        self._log(logging.DEBUG, 'received %s events', len(result.events))
101
102        for event in result.events:
103            if not event.payload or not bool(event.payload.data):
104                continue
105
106            device_id_str = event.type[len(self._event_type_prefix) + 2]
107            with contextlib.suppress(ValueError):
108                enabled_devices.add(int(device_id_str))
109
110        self._log(logging.DEBUG, 'detected %s enabled devices',
111                  len(enabled_devices))
112        return enabled_devices
113
114    def _log(self, level, msg, *args, **kwargs):
115        if not mlog.isEnabledFor(level):
116            return
117
118        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
119
120
121def _request_from_event(event_type_prefix, event):
122    event_type_suffix = event.type[len(event_type_prefix):]
123
124    if event_type_suffix[:2] != ('system', 'remote_device'):
125        raise Exception('unsupported event type')
126
127    device_id = int(event_type_suffix[2])
128
129    if event_type_suffix[3] == 'enable':
130        enable = bool(event.payload.data)
131        return RemoteDeviceEnableReq(device_id=device_id,
132                                     enable=enable)
133
134    if event_type_suffix[3] == 'write':
135        data_name = event_type_suffix[4]
136        request_id = event.payload.data['request_id']
137        value = event.payload.data['value']
138        return RemoteDeviceWriteReq(device_id=device_id,
139                                    data_name=data_name,
140                                    request_id=request_id,
141                                    value=value)
142
143    raise Exception('unsupported event type')
144
145
146def _response_to_register_event(event_type_prefix, res):
147    if isinstance(res, StatusRes):
148        event_type = (*event_type_prefix, 'gateway', 'status')
149        payload = res.status
150
151    elif isinstance(res, RemoteDeviceStatusRes):
152        event_type = (*event_type_prefix, 'gateway', 'remote_device',
153                      str(res.device_id), 'status')
154        payload = res.status
155
156    elif isinstance(res, RemoteDeviceReadRes):
157        event_type = (*event_type_prefix, 'gateway', 'remote_device',
158                      str(res.device_id), 'read', res.data_name)
159        payload = {'result': res.result}
160        if res.value is not None:
161            payload['value'] = res.value
162        if res.cause is not None:
163            payload['cause'] = res.cause
164
165    elif isinstance(res, RemoteDeviceWriteRes):
166        event_type = (*event_type_prefix, 'gateway', 'remote_device',
167                      str(res.device_id), 'write', res.data_name)
168        payload = {'request_id': res.request_id,
169                   'result': res.result}
170
171    else:
172        raise ValueError('invalid response type')
173
174    return hat.event.common.RegisterEvent(
175        type=event_type,
176        source_timestamp=None,
177        payload=hat.event.common.EventPayloadJson(payload))
class RemoteDeviceEnableReq(typing.NamedTuple):
17class RemoteDeviceEnableReq(typing.NamedTuple):
18    device_id: int
19    enable: bool

RemoteDeviceEnableReq(device_id, enable)

RemoteDeviceEnableReq(device_id: int, enable: bool)

Create new instance of RemoteDeviceEnableReq(device_id, enable)

device_id: int

Alias for field number 0

enable: bool

Alias for field number 1

class RemoteDeviceWriteReq(typing.NamedTuple):
22class RemoteDeviceWriteReq(typing.NamedTuple):
23    device_id: int
24    data_name: str
25    request_id: str
26    value: int

RemoteDeviceWriteReq(device_id, data_name, request_id, value)

RemoteDeviceWriteReq(device_id: int, data_name: str, request_id: str, value: int)

Create new instance of RemoteDeviceWriteReq(device_id, data_name, request_id, value)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

request_id: str

Alias for field number 2

value: int

Alias for field number 3

class StatusRes(typing.NamedTuple):
32class StatusRes(typing.NamedTuple):
33    status: str

StatusRes(status,)

StatusRes(status: str)

Create new instance of StatusRes(status,)

status: str

Alias for field number 0

class RemoteDeviceStatusRes(typing.NamedTuple):
36class RemoteDeviceStatusRes(typing.NamedTuple):
37    device_id: int
38    status: str

RemoteDeviceStatusRes(device_id, status)

RemoteDeviceStatusRes(device_id: int, status: str)

Create new instance of RemoteDeviceStatusRes(device_id, status)

device_id: int

Alias for field number 0

status: str

Alias for field number 1

class RemoteDeviceReadRes(typing.NamedTuple):
41class RemoteDeviceReadRes(typing.NamedTuple):
42    device_id: int
43    data_name: str
44    result: str
45    value: int | None
46    cause: str | None

RemoteDeviceReadRes(device_id, data_name, result, value, cause)

RemoteDeviceReadRes( device_id: int, data_name: str, result: str, value: int | None, cause: str | None)

Create new instance of RemoteDeviceReadRes(device_id, data_name, result, value, cause)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

result: str

Alias for field number 2

value: int | None

Alias for field number 3

cause: str | None

Alias for field number 4

class RemoteDeviceWriteRes(typing.NamedTuple):
49class RemoteDeviceWriteRes(typing.NamedTuple):
50    device_id: int
51    data_name: str
52    request_id: str
53    result: str

RemoteDeviceWriteRes(device_id, data_name, request_id, result)

RemoteDeviceWriteRes(device_id: int, data_name: str, request_id: str, result: str)

Create new instance of RemoteDeviceWriteRes(device_id, data_name, request_id, result)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

request_id: str

Alias for field number 2

result: str

Alias for field number 3

class EventerClientProxy(hat.aio.group.Resource):
 62class EventerClientProxy(aio.Resource):
 63
 64    def __init__(self,
 65                 eventer_client: hat.event.eventer.Client,
 66                 event_type_prefix: common.EventTypePrefix,
 67                 log_prefix: str):
 68        self._eventer_client = eventer_client
 69        self._event_type_prefix = event_type_prefix
 70        self._log_prefix = log_prefix
 71
 72    @property
 73    def async_group(self) -> aio.Group:
 74        return self._eventer_client.async_group
 75
 76    def process_events(self,
 77                       events: Collection[hat.event.common.Event]
 78                       ) -> Iterable[Request]:
 79        self._log(logging.DEBUG, 'received %s events', len(events))
 80        for event in events:
 81            try:
 82                yield _request_from_event(self._event_type_prefix, event)
 83
 84            except Exception as e:
 85                self._log(logging.INFO, 'received invalid event: %s', e,
 86                          exc_info=e)
 87
 88    async def write(self, responses: Iterable[Response]):
 89        events = [_response_to_register_event(self._event_type_prefix, i)
 90                  for i in responses]
 91        await self._eventer_client.register(events)
 92
 93    async def query_enabled_devices(self) -> set[int]:
 94        self._log(logging.DEBUG, 'querying enabled devices')
 95        enabled_devices = set()
 96
 97        event_type = (*self._event_type_prefix, 'system', 'remote_device',
 98                      '?', 'enable')
 99        params = hat.event.common.QueryLatestParams([event_type])
100        result = await self._eventer_client.query(params)
101        self._log(logging.DEBUG, 'received %s events', len(result.events))
102
103        for event in result.events:
104            if not event.payload or not bool(event.payload.data):
105                continue
106
107            device_id_str = event.type[len(self._event_type_prefix) + 2]
108            with contextlib.suppress(ValueError):
109                enabled_devices.add(int(device_id_str))
110
111        self._log(logging.DEBUG, 'detected %s enabled devices',
112                  len(enabled_devices))
113        return enabled_devices
114
115    def _log(self, level, msg, *args, **kwargs):
116        if not mlog.isEnabledFor(level):
117            return
118
119        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)

Resource with lifetime control based on Group.

EventerClientProxy( eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], log_prefix: str)
64    def __init__(self,
65                 eventer_client: hat.event.eventer.Client,
66                 event_type_prefix: common.EventTypePrefix,
67                 log_prefix: str):
68        self._eventer_client = eventer_client
69        self._event_type_prefix = event_type_prefix
70        self._log_prefix = log_prefix
async_group: hat.aio.group.Group
72    @property
73    def async_group(self) -> aio.Group:
74        return self._eventer_client.async_group

Group controlling resource's lifetime.

def process_events( self, events: Collection[hat.event.common.common.Event]) -> Iterable[RemoteDeviceEnableReq | RemoteDeviceWriteReq]:
76    def process_events(self,
77                       events: Collection[hat.event.common.Event]
78                       ) -> Iterable[Request]:
79        self._log(logging.DEBUG, 'received %s events', len(events))
80        for event in events:
81            try:
82                yield _request_from_event(self._event_type_prefix, event)
83
84            except Exception as e:
85                self._log(logging.INFO, 'received invalid event: %s', e,
86                          exc_info=e)
async def write( self, responses: Iterable[StatusRes | RemoteDeviceStatusRes | RemoteDeviceReadRes | RemoteDeviceWriteRes]):
88    async def write(self, responses: Iterable[Response]):
89        events = [_response_to_register_event(self._event_type_prefix, i)
90                  for i in responses]
91        await self._eventer_client.register(events)
async def query_enabled_devices(self) -> set[int]:
 93    async def query_enabled_devices(self) -> set[int]:
 94        self._log(logging.DEBUG, 'querying enabled devices')
 95        enabled_devices = set()
 96
 97        event_type = (*self._event_type_prefix, 'system', 'remote_device',
 98                      '?', 'enable')
 99        params = hat.event.common.QueryLatestParams([event_type])
100        result = await self._eventer_client.query(params)
101        self._log(logging.DEBUG, 'received %s events', len(result.events))
102
103        for event in result.events:
104            if not event.payload or not bool(event.payload.data):
105                continue
106
107            device_id_str = event.type[len(self._event_type_prefix) + 2]
108            with contextlib.suppress(ValueError):
109                enabled_devices.add(int(device_id_str))
110
111        self._log(logging.DEBUG, 'detected %s enabled devices',
112                  len(enabled_devices))
113        return enabled_devices