hat.gateway.engine

Gateway engine

  1"""Gateway engine"""
  2
  3from collections.abc import Collection, Iterable
  4import asyncio
  5import collections
  6import contextlib
  7import logging
  8
  9from hat import aio
 10from hat import json
 11import hat.event.common
 12import hat.event.eventer
 13
 14from hat.gateway import common
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20
 21class Engine(aio.Resource):
 22    """Gateway engine"""
 23
 24    def __init__(self,
 25                 conf: json.Data,
 26                 eventer_client: hat.event.eventer.Client,
 27                 events_queue_size: int = 1024):
 28        self._eventer_client = eventer_client
 29        self._async_group = aio.Group()
 30        self._events_queue = aio.Queue(events_queue_size)
 31        self._devices = {}
 32
 33        for device_conf in conf['devices']:
 34            info = common.import_device_info(device_conf['module'])
 35            event_type_prefix = ('gateway', info.type, device_conf['name'])
 36
 37            self._devices[event_type_prefix] = _DeviceProxy(
 38                conf=device_conf,
 39                eventer_client=eventer_client,
 40                event_type_prefix=event_type_prefix,
 41                async_group=self.async_group,
 42                create_device=info.create,
 43                events_queue_size=events_queue_size)
 44
 45        self.async_group.spawn(self._run)
 46
 47    @property
 48    def async_group(self) -> aio.Group:
 49        """Async group"""
 50        return self._async_group
 51
 52    async def process_events(self, events: Iterable[hat.event.common.Event]):
 53        await self._events_queue.put(events)
 54
 55    async def _run(self):
 56        try:
 57            event_types = [(*event_type_prefix, 'system', 'enable')
 58                           for event_type_prefix in self._devices.keys()]
 59            params = hat.event.common.QueryLatestParams(event_types)
 60            result = await self._eventer_client.query(params)
 61
 62            for event in result.events:
 63                event_type_prefix = event.type[:3]
 64                device = self._devices.get(event_type_prefix)
 65                if not device or event.type[3:] != ('system', 'enable'):
 66                    continue
 67
 68                device.set_enable(
 69                    isinstance(event.payload,
 70                               hat.event.common.EventPayloadJson) and
 71                    event.payload.data is True)
 72
 73            while True:
 74                events = await self._events_queue.get()
 75
 76                device_events = collections.defaultdict(collections.deque)
 77
 78                for event in events:
 79                    event_type_prefix = event.type[:3]
 80                    device = self._devices.get(event_type_prefix)
 81                    if not device:
 82                        mlog.warning("received invalid event type prefix %s",
 83                                     event_type_prefix)
 84                        continue
 85
 86                    if event.type[3:] == ('system', 'enable'):
 87                        device.set_enable(
 88                            isinstance(event.payload,
 89                                       hat.event.common.EventPayloadJson) and
 90                            event.payload.data is True)
 91
 92                    else:
 93                        device_events[device].append(event)
 94
 95                for device, dev_events in device_events.items():
 96                    await device.process_events(dev_events)
 97
 98        except Exception as e:
 99            mlog.error("engine run error: %s", e, exc_info=e)
100
101        finally:
102            self.close()
103            self._events_queue.close()
104
105
106class _DeviceProxy(aio.Resource):
107
108    def __init__(self,
109                 conf: json.Data,
110                 eventer_client: hat.event.eventer.Client,
111                 event_type_prefix: common.EventTypePrefix,
112                 async_group: aio.Group,
113                 create_device: common.CreateDevice,
114                 events_queue_size: int = 1024):
115        self._conf = conf
116        self._eventer_client = eventer_client
117        self._event_type_prefix = event_type_prefix
118        self._async_group = async_group
119        self._create_device = create_device
120        self._events_queue_size = events_queue_size
121        self._events_queue = None
122        self._enable_event = asyncio.Event()
123
124        self.async_group.spawn(self._run)
125
126    @property
127    def async_group(self) -> aio.Group:
128        return self._async_group
129
130    def set_enable(self, enable: bool):
131        if enable:
132            if self._events_queue is not None:
133                return
134
135            self._events_queue = aio.Queue(self._events_queue_size)
136            self._enable_event.set()
137
138        else:
139            if self._events_queue is None:
140                return
141
142            self._events_queue.close()
143            self._events_queue = None
144            self._enable_event.set()
145
146    async def process_events(self, events: Collection[hat.event.common.Event]):
147        if self._events_queue is None:
148            mlog.warning("device not enabled - ignoring %s events",
149                         len(events))
150            return
151
152        await self._events_queue.put(events)
153
154    async def _run(self):
155        try:
156            while True:
157                self._enable_event.clear()
158
159                if self._events_queue is None:
160                    await self._enable_event.wait()
161                    continue
162
163                events_queue = self._events_queue
164                device = await aio.call(self._create_device, self._conf,
165                                        self._eventer_client,
166                                        self._event_type_prefix)
167
168                try:
169                    device.async_group.spawn(aio.call_on_cancel,
170                                             events_queue.close)
171                    await self._register_running(True)
172
173                    while True:
174                        events = await events_queue.get()
175
176                        if not device.is_open:
177                            raise Exception('device closed')
178
179                        await aio.call(device.process_events, events)
180
181                except aio.QueueClosedError:
182                    if not events_queue.is_closed:
183                        raise
184
185                    if not device.is_open:
186                        raise Exception('device closed')
187
188                finally:
189                    await aio.uncancellable(self._close_device(device))
190
191        except Exception as e:
192            mlog.error("device proxy run error: %s", e, exc_info=e)
193
194        finally:
195            self.close()
196
197    async def _close_device(self, device):
198        await device.async_close()
199
200        with contextlib.suppress(ConnectionError):
201            await self._register_running(False)
202
203    async def _register_running(self, is_running):
204        await self._eventer_client.register([
205            hat.event.common.RegisterEvent(
206                type=(*self._event_type_prefix, 'gateway', 'running'),
207                source_timestamp=hat.event.common.now(),
208                payload=hat.event.common.EventPayloadJson(is_running))])
mlog: logging.Logger = <Logger hat.gateway.engine (WARNING)>

Module logger

class Engine(hat.aio.group.Resource):
 22class Engine(aio.Resource):
 23    """Gateway engine"""
 24
 25    def __init__(self,
 26                 conf: json.Data,
 27                 eventer_client: hat.event.eventer.Client,
 28                 events_queue_size: int = 1024):
 29        self._eventer_client = eventer_client
 30        self._async_group = aio.Group()
 31        self._events_queue = aio.Queue(events_queue_size)
 32        self._devices = {}
 33
 34        for device_conf in conf['devices']:
 35            info = common.import_device_info(device_conf['module'])
 36            event_type_prefix = ('gateway', info.type, device_conf['name'])
 37
 38            self._devices[event_type_prefix] = _DeviceProxy(
 39                conf=device_conf,
 40                eventer_client=eventer_client,
 41                event_type_prefix=event_type_prefix,
 42                async_group=self.async_group,
 43                create_device=info.create,
 44                events_queue_size=events_queue_size)
 45
 46        self.async_group.spawn(self._run)
 47
 48    @property
 49    def async_group(self) -> aio.Group:
 50        """Async group"""
 51        return self._async_group
 52
 53    async def process_events(self, events: Iterable[hat.event.common.Event]):
 54        await self._events_queue.put(events)
 55
 56    async def _run(self):
 57        try:
 58            event_types = [(*event_type_prefix, 'system', 'enable')
 59                           for event_type_prefix in self._devices.keys()]
 60            params = hat.event.common.QueryLatestParams(event_types)
 61            result = await self._eventer_client.query(params)
 62
 63            for event in result.events:
 64                event_type_prefix = event.type[:3]
 65                device = self._devices.get(event_type_prefix)
 66                if not device or event.type[3:] != ('system', 'enable'):
 67                    continue
 68
 69                device.set_enable(
 70                    isinstance(event.payload,
 71                               hat.event.common.EventPayloadJson) and
 72                    event.payload.data is True)
 73
 74            while True:
 75                events = await self._events_queue.get()
 76
 77                device_events = collections.defaultdict(collections.deque)
 78
 79                for event in events:
 80                    event_type_prefix = event.type[:3]
 81                    device = self._devices.get(event_type_prefix)
 82                    if not device:
 83                        mlog.warning("received invalid event type prefix %s",
 84                                     event_type_prefix)
 85                        continue
 86
 87                    if event.type[3:] == ('system', 'enable'):
 88                        device.set_enable(
 89                            isinstance(event.payload,
 90                                       hat.event.common.EventPayloadJson) and
 91                            event.payload.data is True)
 92
 93                    else:
 94                        device_events[device].append(event)
 95
 96                for device, dev_events in device_events.items():
 97                    await device.process_events(dev_events)
 98
 99        except Exception as e:
100            mlog.error("engine run error: %s", e, exc_info=e)
101
102        finally:
103            self.close()
104            self._events_queue.close()

Gateway engine

Engine( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
25    def __init__(self,
26                 conf: json.Data,
27                 eventer_client: hat.event.eventer.Client,
28                 events_queue_size: int = 1024):
29        self._eventer_client = eventer_client
30        self._async_group = aio.Group()
31        self._events_queue = aio.Queue(events_queue_size)
32        self._devices = {}
33
34        for device_conf in conf['devices']:
35            info = common.import_device_info(device_conf['module'])
36            event_type_prefix = ('gateway', info.type, device_conf['name'])
37
38            self._devices[event_type_prefix] = _DeviceProxy(
39                conf=device_conf,
40                eventer_client=eventer_client,
41                event_type_prefix=event_type_prefix,
42                async_group=self.async_group,
43                create_device=info.create,
44                events_queue_size=events_queue_size)
45
46        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
48    @property
49    def async_group(self) -> aio.Group:
50        """Async group"""
51        return self._async_group

Async group

async def process_events(self, events: Iterable[hat.event.common.common.Event]):
53    async def process_events(self, events: Iterable[hat.event.common.Event]):
54        await self._events_queue.put(events)