hat.gateway.engine

Gateway engine

  1"""Gateway engine"""
  2
  3import collections
  4import contextlib
  5import importlib
  6import logging
  7
  8from hat import aio
  9from hat import json
 10import hat.event.eventer
 11import hat.event.common
 12
 13from hat.gateway import common
 14
 15
 16mlog: logging.Logger = logging.getLogger(__name__)
 17"""Module logger"""
 18
 19
 20async def create_engine(conf: json.Data,
 21                        client: hat.event.eventer.Client
 22                        ) -> 'Engine':
 23    """Create gateway engine"""
 24    engine = Engine()
 25    engine._client = client
 26    engine._devices = {}
 27
 28    try:
 29        for device_conf in conf['devices']:
 30            device = _DeviceProxy(device_conf, client, conf['gateway_name'])
 31            if device.event_type_prefix in engine._devices:
 32                raise Exception(f'duplicate device identifier: '
 33                                f'{device.event_type_prefix}')
 34            engine._devices[device.event_type_prefix] = device
 35
 36        engine.async_group.spawn(engine._read_loop)
 37
 38    except BaseException:
 39        await aio.uncancellable(engine.async_close())
 40        raise
 41
 42    return engine
 43
 44
 45class Engine(aio.Resource):
 46    """Gateway engine"""
 47
 48    @property
 49    def async_group(self) -> aio.Group:
 50        """Async group"""
 51        return self._client.async_group
 52
 53    async def _read_loop(self):
 54        try:
 55            events = []
 56
 57            if self._devices:
 58                event_types = [(*device.event_type_prefix, 'system', 'enable')
 59                               for device in self._devices.values()]
 60                query = hat.event.common.QueryData(event_types=event_types,
 61                                                   unique_type=True)
 62                events = await self._client.query(query)
 63
 64            while True:
 65                device_events = {}
 66
 67                for event in events:
 68                    event_type_prefix = event.event_type[:4]
 69                    device = self._devices.get(event_type_prefix)
 70                    if not device:
 71                        continue
 72
 73                    if device not in device_events:
 74                        device_events[device] = collections.deque()
 75                    device_events[device].append(event)
 76
 77                for device, events in device_events.items():
 78                    device.receive_queue.put_nowait(events)
 79
 80                events = await self._client.receive()
 81
 82        except ConnectionError:
 83            pass
 84
 85        except Exception as e:
 86            mlog.error('read loop error: %s', e, exc_info=e)
 87
 88        finally:
 89            self.close()
 90
 91
 92class _DeviceProxy(aio.Resource):
 93
 94    def __init__(self, conf, client, gateway_name):
 95        self._conf = conf
 96        self._client = client
 97        self._receive_queue = aio.Queue()
 98        self._events_queues_queue = aio.Queue()
 99        self._device_module = importlib.import_module(conf['module'])
100        self._event_type_prefix = ('gateway', gateway_name,
101                                   self._device_module.device_type,
102                                   conf['name'])
103
104        self.async_group.spawn(self._read_loop)
105        self.async_group.spawn(self._device_loop)
106
107    @property
108    def async_group(self):
109        return self._client.async_group
110
111    @property
112    def event_type_prefix(self):
113        return self._event_type_prefix
114
115    @property
116    def receive_queue(self):
117        return self._receive_queue
118
119    async def _read_loop(self):
120        try:
121            events = collections.deque()
122            enabled = False
123
124            while True:
125                while True:
126                    while events and not enabled:
127                        event = events.popleft()
128                        if event.event_type[5:] == ('enable', ):
129                            enabled = _is_enable_event(event)
130
131                    if enabled:
132                        break
133
134                    events = await self._receive_queue.get()
135
136                events_queue = aio.Queue()
137                try:
138                    self._events_queues_queue.put_nowait(events_queue)
139
140                    while True:
141                        filtered_events = collections.deque()
142
143                        while events and enabled:
144                            event = events.popleft()
145                            if event.event_type[5:] == ('enable', ):
146                                enabled = _is_enable_event(event)
147                            else:
148                                filtered_events.append(event)
149
150                        if filtered_events and not events_queue.is_closed:
151                            events_queue.put_nowait(list(filtered_events))
152
153                        if not enabled:
154                            break
155
156                        events = await self._receive_queue.get()
157
158                finally:
159                    events_queue.close()
160
161        except Exception as e:
162            mlog.error('read loop error: %s', e, exc_info=e)
163
164        finally:
165            self.close()
166
167    async def _device_loop(self):
168        try:
169            self._register_running(False)
170            while True:
171                events_queue = await self._events_queues_queue.get()
172                if events_queue.is_closed:
173                    continue
174
175                async with self.async_group.create_subgroup() as subgroup:
176                    client = _DeviceEventClient(self._client)
177                    device = await aio.call(self._device_module.create,
178                                            self._conf, client,
179                                            self._event_type_prefix)
180
181                    subgroup.spawn(aio.call_on_cancel, device.async_close)
182                    subgroup.spawn(aio.call_on_cancel, events_queue.close)
183                    subgroup.spawn(aio.call_on_done, device.wait_closing(),
184                                   subgroup.close)
185
186                    try:
187                        self._register_running(True)
188                        while True:
189                            events = await events_queue.get()
190                            client.receive_queue.put_nowait(events)
191
192                    except aio.QueueClosedError:
193                        if not device.is_open:
194                            break
195
196                    finally:
197                        with contextlib.suppress(ConnectionError):
198                            self._register_running(False)
199
200        except Exception as e:
201            mlog.error('device loop error: %s', e, exc_info=e)
202
203        finally:
204            self.close()
205
206    def _register_running(self, is_running):
207        self._client.register([hat.event.common.RegisterEvent(
208            event_type=(*self._event_type_prefix, 'gateway', 'running'),
209            source_timestamp=hat.event.common.now(),
210            payload=hat.event.common.EventPayload(
211                type=hat.event.common.EventPayloadType.JSON,
212                data=is_running))])
213
214
215class _DeviceEventClient(common.DeviceEventClient):
216
217    def __init__(self, client):
218        self._client = client
219        self._receive_queue = aio.Queue()
220
221        self.async_group.spawn(aio.call_on_cancel, self._receive_queue.close)
222
223    @property
224    def async_group(self):
225        return self._client.async_group
226
227    @property
228    def receive_queue(self):
229        return self._receive_queue
230
231    async def receive(self):
232        try:
233            return await self._receive_queue.get()
234
235        except aio.QueueClosedError:
236            raise ConnectionError()
237
238    def register(self, events):
239        self._client.register(events)
240
241    async def register_with_response(self, events):
242        return await self._client.register_with_response(events)
243
244    async def query(self, data):
245        return await self._client.query(data)
246
247
248def _is_enable_event(event):
249    return (event.payload and
250            event.payload.type == hat.event.common.EventPayloadType.JSON and
251            event.payload.data is True)
mlog: logging.Logger = <Logger hat.gateway.engine (WARNING)>

Module logger

async def create_engine( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], client: hat.event.eventer.client.Client) -> Engine:
21async def create_engine(conf: json.Data,
22                        client: hat.event.eventer.Client
23                        ) -> 'Engine':
24    """Create gateway engine"""
25    engine = Engine()
26    engine._client = client
27    engine._devices = {}
28
29    try:
30        for device_conf in conf['devices']:
31            device = _DeviceProxy(device_conf, client, conf['gateway_name'])
32            if device.event_type_prefix in engine._devices:
33                raise Exception(f'duplicate device identifier: '
34                                f'{device.event_type_prefix}')
35            engine._devices[device.event_type_prefix] = device
36
37        engine.async_group.spawn(engine._read_loop)
38
39    except BaseException:
40        await aio.uncancellable(engine.async_close())
41        raise
42
43    return engine

Create gateway engine

class Engine(hat.aio.group.Resource):
46class Engine(aio.Resource):
47    """Gateway engine"""
48
49    @property
50    def async_group(self) -> aio.Group:
51        """Async group"""
52        return self._client.async_group
53
54    async def _read_loop(self):
55        try:
56            events = []
57
58            if self._devices:
59                event_types = [(*device.event_type_prefix, 'system', 'enable')
60                               for device in self._devices.values()]
61                query = hat.event.common.QueryData(event_types=event_types,
62                                                   unique_type=True)
63                events = await self._client.query(query)
64
65            while True:
66                device_events = {}
67
68                for event in events:
69                    event_type_prefix = event.event_type[:4]
70                    device = self._devices.get(event_type_prefix)
71                    if not device:
72                        continue
73
74                    if device not in device_events:
75                        device_events[device] = collections.deque()
76                    device_events[device].append(event)
77
78                for device, events in device_events.items():
79                    device.receive_queue.put_nowait(events)
80
81                events = await self._client.receive()
82
83        except ConnectionError:
84            pass
85
86        except Exception as e:
87            mlog.error('read loop error: %s', e, exc_info=e)
88
89        finally:
90            self.close()

Gateway engine

async_group: hat.aio.group.Group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close