hat.gateway.engine

Gateway engine

  1"""Gateway engine"""
  2
  3from collections.abc import Iterable
  4import asyncio
  5import contextlib
  6import logging
  7
  8from hat import aio
  9from hat import json
 10import hat.event.common
 11import hat.event.eventer
 12
 13from hat.gateway import common
 14
 15
 16mlog: logging.Logger = logging.getLogger(__name__)
 17"""Module logger"""
 18
 19
 20class Engine(aio.Resource):
 21    """Gateway engine"""
 22
 23    def __init__(self,
 24                 conf: json.Data,
 25                 eventer_client: hat.event.eventer.Client,
 26                 event_queue_size: int = 1024):
 27        self._async_group = aio.Group()
 28
 29        self._devices = {
 30            device.event_type_prefix: device
 31            for device in (_DeviceProxy(conf=device_conf,
 32                                        async_group=self.async_group,
 33                                        eventer_client=eventer_client,
 34                                        event_queue_size=event_queue_size)
 35                           for device_conf in conf['devices'])}
 36
 37    @property
 38    def async_group(self) -> aio.Group:
 39        """Async group"""
 40        return self._async_group
 41
 42    async def process_events(self, events: Iterable[hat.event.common.Event]):
 43        """Process received events"""
 44        for event in events:
 45            event_type_prefix = event.type[:3]
 46
 47            device = self._devices.get(event_type_prefix)
 48            if not device:
 49                mlog.debug("no device - ignorring event with type %s",
 50                           event.type)
 51                continue
 52
 53            await device.process_event(event)
 54
 55
 56class _DeviceProxy(aio.Resource):
 57
 58    def __init__(self,
 59                 conf: common.DeviceConf,
 60                 async_group: aio.Group,
 61                 eventer_client: hat.event.eventer.Client,
 62                 event_queue_size: int = 1024):
 63        self._conf = conf
 64        self._async_group = async_group
 65        self._eventer_client = eventer_client
 66        self._event_queue_size = event_queue_size
 67        self._info = common.import_device_info(conf['module'])
 68        self._event_type_prefix = ('gateway', self._info.type, conf['name'])
 69        self._enabled = None
 70        self._enabled_event = asyncio.Event()
 71        self._event_queue = None
 72        self._log = _create_device_proxy_logger_adapter(conf['name'])
 73
 74        self.async_group.spawn(self._device_loop)
 75
 76    @property
 77    def async_group(self) -> aio.Group:
 78        return self._async_group
 79
 80    @property
 81    def event_type_prefix(self) -> hat.event.common.EventType:
 82        return self._event_type_prefix
 83
 84    async def process_event(self, event: hat.event.common.Event):
 85        if event.type[3:] == ('system', 'enable'):
 86            self._process_enable_event(event)
 87            return
 88
 89        if self._event_queue is not None:
 90            with contextlib.suppress(aio.QueueClosedError):
 91                await self._event_queue.put(event)
 92                return
 93
 94        self._log.debug("device not running - ignoring event with type %s",
 95                        event.type)
 96
 97    def _process_enable_event(self, event):
 98        self._enabled = (
 99            isinstance(event.payload, hat.event.common.EventPayloadJson) and
100            event.payload.data is True)
101        self._enabled_event.set()
102
103    async def _device_loop(self):
104        try:
105            result = await self._eventer_client.query(
106                hat.event.common.QueryLatestParams(
107                    [(*self._event_type_prefix, 'system', 'enable')]))
108
109            if self._enabled is None:
110                for event in result.events:
111                    self._process_enable_event(event)
112
113            while True:
114                while not self._enabled:
115                    self._enabled_event.clear()
116                    await self._enabled_event.wait()
117
118                if not self.is_open:
119                    break
120
121                async with self.async_group.create_subgroup() as subgroup:
122                    subgroup.spawn(self._enabled_loop)
123
124                    while self._enabled:
125                        self._enabled_event.clear()
126                        await self._enabled_event.wait()
127
128        except Exception as e:
129            self._log.error("device loop error: %s", e, exc_info=e)
130
131        finally:
132            self.close()
133
134    async def _enabled_loop(self):
135        try:
136            device = await aio.call(self._info.create,
137                                    self._conf,
138                                    self._eventer_client,
139                                    self._event_type_prefix)
140
141        except Exception as e:
142            self._log.error("error creating device: %s", e, exc_info=e)
143            return
144
145        async def cleanup():
146            await device.async_close()
147
148            with contextlib.suppress(ConnectionError):
149                await self._register_running(False)
150
151        try:
152            self._event_queue = aio.Queue(self._event_queue_size)
153            device.async_group.spawn(aio.call_on_cancel,
154                                     self._event_queue.close)
155
156            await self._register_running(True)
157
158            while True:
159                event = await self._event_queue.get()
160
161                await aio.call(device.process_event, event)
162
163        except aio.QueueClosedError:
164            pass
165
166        except Exception as e:
167            self._log.error("enabled loop error: %s", e, exc_info=e)
168
169        finally:
170            self._event_queue = None
171            await aio.uncancellable(cleanup())
172
173    async def _register_running(self, is_running):
174        await self._eventer_client.register([
175            hat.event.common.RegisterEvent(
176                type=(*self._event_type_prefix, 'gateway', 'running'),
177                source_timestamp=hat.event.common.now(),
178                payload=hat.event.common.EventPayloadJson(is_running))])
179
180
181def _create_device_proxy_logger_adapter(name):
182    extra = {'meta': {'type': 'DeviceProxy',
183                      'name': name}}
184
185    return logging.LoggerAdapter(mlog, extra)
mlog: logging.Logger = <Logger hat.gateway.engine (WARNING)>

Module logger

class Engine(hat.aio.group.Resource):
21class Engine(aio.Resource):
22    """Gateway engine"""
23
24    def __init__(self,
25                 conf: json.Data,
26                 eventer_client: hat.event.eventer.Client,
27                 event_queue_size: int = 1024):
28        self._async_group = aio.Group()
29
30        self._devices = {
31            device.event_type_prefix: device
32            for device in (_DeviceProxy(conf=device_conf,
33                                        async_group=self.async_group,
34                                        eventer_client=eventer_client,
35                                        event_queue_size=event_queue_size)
36                           for device_conf in conf['devices'])}
37
38    @property
39    def async_group(self) -> aio.Group:
40        """Async group"""
41        return self._async_group
42
43    async def process_events(self, events: Iterable[hat.event.common.Event]):
44        """Process received events"""
45        for event in events:
46            event_type_prefix = event.type[:3]
47
48            device = self._devices.get(event_type_prefix)
49            if not device:
50                mlog.debug("no device - ignorring event with type %s",
51                           event.type)
52                continue
53
54            await device.process_event(event)

Gateway engine

Engine( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_queue_size: int = 1024)
24    def __init__(self,
25                 conf: json.Data,
26                 eventer_client: hat.event.eventer.Client,
27                 event_queue_size: int = 1024):
28        self._async_group = aio.Group()
29
30        self._devices = {
31            device.event_type_prefix: device
32            for device in (_DeviceProxy(conf=device_conf,
33                                        async_group=self.async_group,
34                                        eventer_client=eventer_client,
35                                        event_queue_size=event_queue_size)
36                           for device_conf in conf['devices'])}
async_group: hat.aio.group.Group
38    @property
39    def async_group(self) -> aio.Group:
40        """Async group"""
41        return self._async_group

Async group

async def process_events(self, events: Iterable[hat.event.common.common.Event]):
43    async def process_events(self, events: Iterable[hat.event.common.Event]):
44        """Process received events"""
45        for event in events:
46            event_type_prefix = event.type[:3]
47
48            device = self._devices.get(event_type_prefix)
49            if not device:
50                mlog.debug("no device - ignorring event with type %s",
51                           event.type)
52                continue
53
54            await device.process_event(event)

Process received events