hat.gateway.engine

Gateway engine

  1"""Gateway engine"""
  2
  3from collections.abc import Collection, Iterable
  4import asyncio
  5import collections
  6import contextlib
  7import logging
  8
  9from hat import aio
 10from hat import json
 11import hat.event.common
 12import hat.event.eventer
 13
 14from hat.gateway import common
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20
 21class Engine(aio.Resource):
 22    """Gateway engine"""
 23
 24    def __init__(self,
 25                 conf: json.Data,
 26                 eventer_client: hat.event.eventer.Client,
 27                 events_queue_size: int = 1024):
 28        self._eventer_client = eventer_client
 29        self._async_group = aio.Group()
 30        self._events_queue = aio.Queue(events_queue_size)
 31        self._devices = {}
 32
 33        for device_conf in conf['devices']:
 34            info = common.import_device_info(device_conf['module'])
 35            event_type_prefix = ('gateway', info.type, device_conf['name'])
 36
 37            self._devices[event_type_prefix] = _DeviceProxy(
 38                conf=device_conf,
 39                eventer_client=eventer_client,
 40                event_type_prefix=event_type_prefix,
 41                async_group=self.async_group,
 42                create_device=info.create,
 43                events_queue_size=events_queue_size)
 44
 45        self.async_group.spawn(self._run)
 46
 47    @property
 48    def async_group(self) -> aio.Group:
 49        """Async group"""
 50        return self._async_group
 51
 52    async def process_events(self, events: Iterable[hat.event.common.Event]):
 53        await self._events_queue.put(events)
 54
 55    async def _run(self):
 56        try:
 57            event_types = [(*event_type_prefix, 'system', 'enable')
 58                           for event_type_prefix in self._devices.keys()]
 59            params = hat.event.common.QueryLatestParams(event_types)
 60            result = await self._eventer_client.query(params)
 61
 62            for event in result.events:
 63                event_type_prefix = event.type[:3]
 64                device = self._devices.get(event_type_prefix)
 65                if not device or event.type[3:] != ('system', 'enable'):
 66                    continue
 67
 68                device.set_enable(
 69                    isinstance(event.payload,
 70                               hat.event.common.EventPayloadJson) and
 71                    event.payload.data is True)
 72
 73            while True:
 74                events = await self._events_queue.get()
 75
 76                device_events = collections.defaultdict(collections.deque)
 77
 78                for event in events:
 79                    event_type_prefix = event.type[:3]
 80                    device = self._devices.get(event_type_prefix)
 81                    if not device:
 82                        mlog.warning("received invalid event type prefix %s",
 83                                     event_type_prefix)
 84                        continue
 85
 86                    if event.type[3:] == ('system', 'enable'):
 87                        device.set_enable(
 88                            isinstance(event.payload,
 89                                       hat.event.common.EventPayloadJson) and
 90                            event.payload.data is True)
 91
 92                    else:
 93                        device_events[device].append(event)
 94
 95                for device, dev_events in device_events.items():
 96                    await device.process_events(dev_events)
 97
 98        except Exception as e:
 99            mlog.error("engine run error: %s", e, exc_info=e)
100
101        finally:
102            self.close()
103            self._events_queue.close()
104
105
106class _DeviceProxy(aio.Resource):
107
108    def __init__(self,
109                 conf: common.DeviceConf,
110                 eventer_client: hat.event.eventer.Client,
111                 event_type_prefix: common.EventTypePrefix,
112                 async_group: aio.Group,
113                 create_device: common.CreateDevice,
114                 events_queue_size: int = 1024):
115        self._conf = conf
116        self._eventer_client = eventer_client
117        self._event_type_prefix = event_type_prefix
118        self._async_group = async_group
119        self._create_device = create_device
120        self._events_queue_size = events_queue_size
121        self._events_queue = None
122        self._enable_event = asyncio.Event()
123        self._log = _create_device_proxy_logger_adapter(conf['name'])
124
125        self.async_group.spawn(self._run)
126
127    @property
128    def async_group(self) -> aio.Group:
129        return self._async_group
130
131    def set_enable(self, enable: bool):
132        if enable:
133            if self._events_queue is not None:
134                return
135
136            self._events_queue = aio.Queue(self._events_queue_size)
137            self._enable_event.set()
138
139        else:
140            if self._events_queue is None:
141                return
142
143            self._events_queue.close()
144            self._events_queue = None
145            self._enable_event.set()
146
147    async def process_events(self, events: Collection[hat.event.common.Event]):
148        if self._events_queue is None:
149            self._log.warning("device not enabled - ignoring %s events",
150                              len(events))
151            return
152
153        await self._events_queue.put(events)
154
155    async def _run(self):
156        try:
157            while True:
158                self._enable_event.clear()
159
160                if self._events_queue is None:
161                    await self._enable_event.wait()
162                    continue
163
164                events_queue = self._events_queue
165                device = await aio.call(self._create_device, self._conf,
166                                        self._eventer_client,
167                                        self._event_type_prefix)
168
169                try:
170                    device.async_group.spawn(aio.call_on_cancel,
171                                             events_queue.close)
172                    await self._register_running(True)
173
174                    while True:
175                        events = await events_queue.get()
176
177                        if not device.is_open:
178                            raise Exception('device closed')
179
180                        await aio.call(device.process_events, events)
181
182                except aio.QueueClosedError:
183                    if not events_queue.is_closed:
184                        raise
185
186                    if not device.is_open:
187                        raise Exception('device closed')
188
189                finally:
190                    await aio.uncancellable(self._close_device(device))
191
192        except Exception as e:
193            self._log.error("device proxy run error: %s", e, exc_info=e)
194
195        finally:
196            self.close()
197
198    async def _close_device(self, device):
199        await device.async_close()
200
201        with contextlib.suppress(ConnectionError):
202            await self._register_running(False)
203
204    async def _register_running(self, is_running):
205        await self._eventer_client.register([
206            hat.event.common.RegisterEvent(
207                type=(*self._event_type_prefix, 'gateway', 'running'),
208                source_timestamp=hat.event.common.now(),
209                payload=hat.event.common.EventPayloadJson(is_running))])
210
211
212def _create_device_proxy_logger_adapter(name):
213    extra = {'meta': {'type': 'DeviceProxy',
214                      'name': name}}
215
216    return logging.LoggerAdapter(mlog, extra)
mlog: logging.Logger = <Logger hat.gateway.engine (WARNING)>

Module logger

class Engine(hat.aio.group.Resource):
 22class Engine(aio.Resource):
 23    """Gateway engine"""
 24
 25    def __init__(self,
 26                 conf: json.Data,
 27                 eventer_client: hat.event.eventer.Client,
 28                 events_queue_size: int = 1024):
 29        self._eventer_client = eventer_client
 30        self._async_group = aio.Group()
 31        self._events_queue = aio.Queue(events_queue_size)
 32        self._devices = {}
 33
 34        for device_conf in conf['devices']:
 35            info = common.import_device_info(device_conf['module'])
 36            event_type_prefix = ('gateway', info.type, device_conf['name'])
 37
 38            self._devices[event_type_prefix] = _DeviceProxy(
 39                conf=device_conf,
 40                eventer_client=eventer_client,
 41                event_type_prefix=event_type_prefix,
 42                async_group=self.async_group,
 43                create_device=info.create,
 44                events_queue_size=events_queue_size)
 45
 46        self.async_group.spawn(self._run)
 47
 48    @property
 49    def async_group(self) -> aio.Group:
 50        """Async group"""
 51        return self._async_group
 52
 53    async def process_events(self, events: Iterable[hat.event.common.Event]):
 54        await self._events_queue.put(events)
 55
 56    async def _run(self):
 57        try:
 58            event_types = [(*event_type_prefix, 'system', 'enable')
 59                           for event_type_prefix in self._devices.keys()]
 60            params = hat.event.common.QueryLatestParams(event_types)
 61            result = await self._eventer_client.query(params)
 62
 63            for event in result.events:
 64                event_type_prefix = event.type[:3]
 65                device = self._devices.get(event_type_prefix)
 66                if not device or event.type[3:] != ('system', 'enable'):
 67                    continue
 68
 69                device.set_enable(
 70                    isinstance(event.payload,
 71                               hat.event.common.EventPayloadJson) and
 72                    event.payload.data is True)
 73
 74            while True:
 75                events = await self._events_queue.get()
 76
 77                device_events = collections.defaultdict(collections.deque)
 78
 79                for event in events:
 80                    event_type_prefix = event.type[:3]
 81                    device = self._devices.get(event_type_prefix)
 82                    if not device:
 83                        mlog.warning("received invalid event type prefix %s",
 84                                     event_type_prefix)
 85                        continue
 86
 87                    if event.type[3:] == ('system', 'enable'):
 88                        device.set_enable(
 89                            isinstance(event.payload,
 90                                       hat.event.common.EventPayloadJson) and
 91                            event.payload.data is True)
 92
 93                    else:
 94                        device_events[device].append(event)
 95
 96                for device, dev_events in device_events.items():
 97                    await device.process_events(dev_events)
 98
 99        except Exception as e:
100            mlog.error("engine run error: %s", e, exc_info=e)
101
102        finally:
103            self.close()
104            self._events_queue.close()

Gateway engine

Engine( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
25    def __init__(self,
26                 conf: json.Data,
27                 eventer_client: hat.event.eventer.Client,
28                 events_queue_size: int = 1024):
29        self._eventer_client = eventer_client
30        self._async_group = aio.Group()
31        self._events_queue = aio.Queue(events_queue_size)
32        self._devices = {}
33
34        for device_conf in conf['devices']:
35            info = common.import_device_info(device_conf['module'])
36            event_type_prefix = ('gateway', info.type, device_conf['name'])
37
38            self._devices[event_type_prefix] = _DeviceProxy(
39                conf=device_conf,
40                eventer_client=eventer_client,
41                event_type_prefix=event_type_prefix,
42                async_group=self.async_group,
43                create_device=info.create,
44                events_queue_size=events_queue_size)
45
46        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
48    @property
49    def async_group(self) -> aio.Group:
50        """Async group"""
51        return self._async_group

Async group

async def process_events(self, events: Iterable[hat.event.common.common.Event]):
53    async def process_events(self, events: Iterable[hat.event.common.Event]):
54        await self._events_queue.put(events)