hat.gateway.engine

Gateway engine

  1"""Gateway engine"""
  2
  3from collections.abc import Collection, Iterable
  4import asyncio
  5import collections
  6import contextlib
  7import importlib
  8import logging
  9
 10from hat import aio
 11from hat import json
 12import hat.event.common
 13import hat.event.eventer
 14
 15from hat.gateway import common
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21
 22class Engine(aio.Resource):
 23    """Gateway engine"""
 24
 25    def __init__(self,
 26                 conf: json.Data,
 27                 eventer_client: hat.event.eventer.Client,
 28                 events_queue_size: int = 1024):
 29        self._eventer_client = eventer_client
 30        self._async_group = aio.Group()
 31        self._events_queue = aio.Queue(events_queue_size)
 32        self._devices = {}
 33
 34        for device_conf in conf['devices']:
 35            module = importlib.import_module(device_conf['module'])
 36            event_type_prefix = ('gateway',
 37                                 conf['gateway_name'],
 38                                 module.info.type,
 39                                 device_conf['name'])
 40
 41            self._devices[event_type_prefix] = _DeviceProxy(
 42                conf=device_conf,
 43                eventer_client=eventer_client,
 44                event_type_prefix=event_type_prefix,
 45                async_group=self.async_group,
 46                create_device=module.info.create,
 47                events_queue_size=events_queue_size)
 48
 49        self.async_group.spawn(self._run)
 50
 51    @property
 52    def async_group(self) -> aio.Group:
 53        """Async group"""
 54        return self._async_group
 55
 56    async def process_events(self, events: Iterable[hat.event.common.Event]):
 57        await self._events_queue.put(events)
 58
 59    async def _run(self):
 60        try:
 61            event_types = [(*event_type_prefix, 'system', 'enable')
 62                           for event_type_prefix in self._devices.keys()]
 63            params = hat.event.common.QueryLatestParams(event_types)
 64            result = await self._eventer_client.query(params)
 65
 66            for event in result.events:
 67                event_type_prefix = event.type[:4]
 68                device = self._devices.get(event_type_prefix)
 69                if not device or event.type[4:] != ('system', 'enable'):
 70                    continue
 71
 72                device.set_enable(
 73                    isinstance(event.payload,
 74                               hat.event.common.EventPayloadJson) and
 75                    event.payload.data is True)
 76
 77            while True:
 78                events = await self._events_queue.get()
 79
 80                device_events = collections.defaultdict(collections.deque)
 81
 82                for event in events:
 83                    event_type_prefix = event.type[:4]
 84                    device = self._devices.get(event_type_prefix)
 85                    if not device:
 86                        mlog.warning("received invalid event type prefix %s",
 87                                     event_type_prefix)
 88                        continue
 89
 90                    if event.type[4:] == ('system', 'enable'):
 91                        device.set_enable(
 92                            isinstance(event.payload,
 93                                       hat.event.common.EventPayloadJson) and
 94                            event.payload.data is True)
 95
 96                    else:
 97                        device_events[device].append(event)
 98
 99                for device, dev_events in device_events.items():
100                    await device.process_events(dev_events)
101
102        except Exception as e:
103            mlog.error("engine run error: %s", e, exc_info=e)
104
105        finally:
106            self.close()
107            self._events_queue.close()
108
109
110class _DeviceProxy(aio.Resource):
111
112    def __init__(self,
113                 conf: json.Data,
114                 eventer_client: hat.event.eventer.Client,
115                 event_type_prefix: common.EventTypePrefix,
116                 async_group: aio.Group,
117                 create_device: common.CreateDevice,
118                 events_queue_size: int = 1024):
119        self._conf = conf
120        self._eventer_client = eventer_client
121        self._event_type_prefix = event_type_prefix
122        self._async_group = async_group
123        self._create_device = create_device
124        self._events_queue_size = events_queue_size
125        self._events_queue = None
126        self._enable_event = asyncio.Event()
127
128        self.async_group.spawn(self._run)
129
130    @property
131    def async_group(self) -> aio.Group:
132        return self._async_group
133
134    def set_enable(self, enable: bool):
135        if enable:
136            if self._events_queue is not None:
137                return
138
139            self._events_queue = aio.Queue(self._events_queue_size)
140            self._enable_event.set()
141
142        else:
143            if self._events_queue is None:
144                return
145
146            self._events_queue.close()
147            self._events_queue = None
148            self._enable_event.set()
149
150    async def process_events(self, events: Collection[hat.event.common.Event]):
151        if self._events_queue is None:
152            mlog.warning("device not enabled - ignoring %s events",
153                         len(events))
154            return
155
156        await self._events_queue.put(events)
157
158    async def _run(self):
159        try:
160            while True:
161                self._enable_event.clear()
162
163                if self._events_queue is None:
164                    await self._enable_event.wait()
165                    continue
166
167                events_queue = self._events_queue
168                device = await aio.call(self._create_device, self._conf,
169                                        self._eventer_client,
170                                        self._event_type_prefix)
171
172                try:
173                    device.async_group.spawn(aio.call_on_cancel,
174                                             events_queue.close)
175                    await self._register_running(True)
176
177                    while True:
178                        events = await events_queue.get()
179
180                        if not device.is_open:
181                            raise Exception('device closed')
182
183                        await aio.call(device.process_events, events)
184
185                except aio.QueueClosedError:
186                    if not events_queue.is_closed:
187                        raise
188
189                    if not device.is_open:
190                        raise Exception('device closed')
191
192                finally:
193                    await aio.uncancellable(self._close_device(device))
194
195        except Exception as e:
196            mlog.error("device proxy run error: %s", e, exc_info=e)
197
198        finally:
199            self.close()
200
201    async def _close_device(self, device):
202        await device.async_close()
203
204        with contextlib.suppress(ConnectionError):
205            await self._register_running(False)
206
207    async def _register_running(self, is_running):
208        await self._eventer_client.register([
209            hat.event.common.RegisterEvent(
210                type=(*self._event_type_prefix, 'gateway', 'running'),
211                source_timestamp=hat.event.common.now(),
212                payload=hat.event.common.EventPayloadJson(is_running))])
mlog: logging.Logger = <Logger hat.gateway.engine (WARNING)>

Module logger

class Engine(hat.aio.group.Resource):
 23class Engine(aio.Resource):
 24    """Gateway engine"""
 25
 26    def __init__(self,
 27                 conf: json.Data,
 28                 eventer_client: hat.event.eventer.Client,
 29                 events_queue_size: int = 1024):
 30        self._eventer_client = eventer_client
 31        self._async_group = aio.Group()
 32        self._events_queue = aio.Queue(events_queue_size)
 33        self._devices = {}
 34
 35        for device_conf in conf['devices']:
 36            module = importlib.import_module(device_conf['module'])
 37            event_type_prefix = ('gateway',
 38                                 conf['gateway_name'],
 39                                 module.info.type,
 40                                 device_conf['name'])
 41
 42            self._devices[event_type_prefix] = _DeviceProxy(
 43                conf=device_conf,
 44                eventer_client=eventer_client,
 45                event_type_prefix=event_type_prefix,
 46                async_group=self.async_group,
 47                create_device=module.info.create,
 48                events_queue_size=events_queue_size)
 49
 50        self.async_group.spawn(self._run)
 51
 52    @property
 53    def async_group(self) -> aio.Group:
 54        """Async group"""
 55        return self._async_group
 56
 57    async def process_events(self, events: Iterable[hat.event.common.Event]):
 58        await self._events_queue.put(events)
 59
 60    async def _run(self):
 61        try:
 62            event_types = [(*event_type_prefix, 'system', 'enable')
 63                           for event_type_prefix in self._devices.keys()]
 64            params = hat.event.common.QueryLatestParams(event_types)
 65            result = await self._eventer_client.query(params)
 66
 67            for event in result.events:
 68                event_type_prefix = event.type[:4]
 69                device = self._devices.get(event_type_prefix)
 70                if not device or event.type[4:] != ('system', 'enable'):
 71                    continue
 72
 73                device.set_enable(
 74                    isinstance(event.payload,
 75                               hat.event.common.EventPayloadJson) and
 76                    event.payload.data is True)
 77
 78            while True:
 79                events = await self._events_queue.get()
 80
 81                device_events = collections.defaultdict(collections.deque)
 82
 83                for event in events:
 84                    event_type_prefix = event.type[:4]
 85                    device = self._devices.get(event_type_prefix)
 86                    if not device:
 87                        mlog.warning("received invalid event type prefix %s",
 88                                     event_type_prefix)
 89                        continue
 90
 91                    if event.type[4:] == ('system', 'enable'):
 92                        device.set_enable(
 93                            isinstance(event.payload,
 94                                       hat.event.common.EventPayloadJson) and
 95                            event.payload.data is True)
 96
 97                    else:
 98                        device_events[device].append(event)
 99
100                for device, dev_events in device_events.items():
101                    await device.process_events(dev_events)
102
103        except Exception as e:
104            mlog.error("engine run error: %s", e, exc_info=e)
105
106        finally:
107            self.close()
108            self._events_queue.close()

Gateway engine

Engine( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
26    def __init__(self,
27                 conf: json.Data,
28                 eventer_client: hat.event.eventer.Client,
29                 events_queue_size: int = 1024):
30        self._eventer_client = eventer_client
31        self._async_group = aio.Group()
32        self._events_queue = aio.Queue(events_queue_size)
33        self._devices = {}
34
35        for device_conf in conf['devices']:
36            module = importlib.import_module(device_conf['module'])
37            event_type_prefix = ('gateway',
38                                 conf['gateway_name'],
39                                 module.info.type,
40                                 device_conf['name'])
41
42            self._devices[event_type_prefix] = _DeviceProxy(
43                conf=device_conf,
44                eventer_client=eventer_client,
45                event_type_prefix=event_type_prefix,
46                async_group=self.async_group,
47                create_device=module.info.create,
48                events_queue_size=events_queue_size)
49
50        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
52    @property
53    def async_group(self) -> aio.Group:
54        """Async group"""
55        return self._async_group

Async group

async def process_events( self, events: collections.abc.Iterable[hat.event.common.common.Event]):
57    async def process_events(self, events: Iterable[hat.event.common.Event]):
58        await self._events_queue.put(events)
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close