hat.gateway.engine
Gateway engine
1"""Gateway engine""" 2 3from collections.abc import Collection, Iterable 4import asyncio 5import collections 6import contextlib 7import logging 8 9from hat import aio 10from hat import json 11import hat.event.common 12import hat.event.eventer 13 14from hat.gateway import common 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20 21class Engine(aio.Resource): 22 """Gateway engine""" 23 24 def __init__(self, 25 conf: json.Data, 26 eventer_client: hat.event.eventer.Client, 27 events_queue_size: int = 1024): 28 self._eventer_client = eventer_client 29 self._async_group = aio.Group() 30 self._events_queue = aio.Queue(events_queue_size) 31 self._devices = {} 32 33 for device_conf in conf['devices']: 34 info = common.import_device_info(device_conf['module']) 35 event_type_prefix = ('gateway', info.type, device_conf['name']) 36 37 self._devices[event_type_prefix] = _DeviceProxy( 38 conf=device_conf, 39 eventer_client=eventer_client, 40 event_type_prefix=event_type_prefix, 41 async_group=self.async_group, 42 create_device=info.create, 43 events_queue_size=events_queue_size) 44 45 self.async_group.spawn(self._run) 46 47 @property 48 def async_group(self) -> aio.Group: 49 """Async group""" 50 return self._async_group 51 52 async def process_events(self, events: Iterable[hat.event.common.Event]): 53 await self._events_queue.put(events) 54 55 async def _run(self): 56 try: 57 event_types = [(*event_type_prefix, 'system', 'enable') 58 for event_type_prefix in self._devices.keys()] 59 params = hat.event.common.QueryLatestParams(event_types) 60 result = await self._eventer_client.query(params) 61 62 for event in result.events: 63 event_type_prefix = event.type[:3] 64 device = self._devices.get(event_type_prefix) 65 if not device or event.type[3:] != ('system', 'enable'): 66 continue 67 68 device.set_enable( 69 isinstance(event.payload, 70 hat.event.common.EventPayloadJson) and 71 event.payload.data is True) 72 73 while True: 74 events = await self._events_queue.get() 75 76 device_events = collections.defaultdict(collections.deque) 77 78 for event in events: 79 event_type_prefix = event.type[:3] 80 device = self._devices.get(event_type_prefix) 81 if not device: 82 mlog.warning("received invalid event type prefix %s", 83 event_type_prefix) 84 continue 85 86 if event.type[3:] == ('system', 'enable'): 87 device.set_enable( 88 isinstance(event.payload, 89 hat.event.common.EventPayloadJson) and 90 event.payload.data is True) 91 92 else: 93 device_events[device].append(event) 94 95 for device, dev_events in device_events.items(): 96 await device.process_events(dev_events) 97 98 except Exception as e: 99 mlog.error("engine run error: %s", e, exc_info=e) 100 101 finally: 102 self.close() 103 self._events_queue.close() 104 105 106class _DeviceProxy(aio.Resource): 107 108 def __init__(self, 109 conf: common.DeviceConf, 110 eventer_client: hat.event.eventer.Client, 111 event_type_prefix: common.EventTypePrefix, 112 async_group: aio.Group, 113 create_device: common.CreateDevice, 114 events_queue_size: int = 1024): 115 self._conf = conf 116 self._eventer_client = eventer_client 117 self._event_type_prefix = event_type_prefix 118 self._async_group = async_group 119 self._create_device = create_device 120 self._events_queue_size = events_queue_size 121 self._events_queue = None 122 self._enable_event = asyncio.Event() 123 self._log = _create_device_proxy_logger_adapter(conf['name']) 124 125 self.async_group.spawn(self._run) 126 127 @property 128 def async_group(self) -> aio.Group: 129 return self._async_group 130 131 def set_enable(self, enable: bool): 132 if enable: 133 if self._events_queue is not None: 134 return 135 136 self._events_queue = aio.Queue(self._events_queue_size) 137 self._enable_event.set() 138 139 else: 140 if self._events_queue is None: 141 return 142 143 self._events_queue.close() 144 self._events_queue = None 145 self._enable_event.set() 146 147 async def process_events(self, events: Collection[hat.event.common.Event]): 148 if self._events_queue is None: 149 self._log.warning("device not enabled - ignoring %s events", 150 len(events)) 151 return 152 153 await self._events_queue.put(events) 154 155 async def _run(self): 156 try: 157 while True: 158 self._enable_event.clear() 159 160 if self._events_queue is None: 161 await self._enable_event.wait() 162 continue 163 164 events_queue = self._events_queue 165 device = await aio.call(self._create_device, self._conf, 166 self._eventer_client, 167 self._event_type_prefix) 168 169 try: 170 device.async_group.spawn(aio.call_on_cancel, 171 events_queue.close) 172 await self._register_running(True) 173 174 while True: 175 events = await events_queue.get() 176 177 if not device.is_open: 178 raise Exception('device closed') 179 180 await aio.call(device.process_events, events) 181 182 except aio.QueueClosedError: 183 if not events_queue.is_closed: 184 raise 185 186 if not device.is_open: 187 raise Exception('device closed') 188 189 finally: 190 await aio.uncancellable(self._close_device(device)) 191 192 except Exception as e: 193 self._log.error("device proxy run error: %s", e, exc_info=e) 194 195 finally: 196 self.close() 197 198 async def _close_device(self, device): 199 await device.async_close() 200 201 with contextlib.suppress(ConnectionError): 202 await self._register_running(False) 203 204 async def _register_running(self, is_running): 205 await self._eventer_client.register([ 206 hat.event.common.RegisterEvent( 207 type=(*self._event_type_prefix, 'gateway', 'running'), 208 source_timestamp=hat.event.common.now(), 209 payload=hat.event.common.EventPayloadJson(is_running))]) 210 211 212def _create_device_proxy_logger_adapter(name): 213 extra = {'meta': {'type': 'DeviceProxy', 214 'name': name}} 215 216 return logging.LoggerAdapter(mlog, extra)
Module logger
class
Engine(hat.aio.group.Resource):
22class Engine(aio.Resource): 23 """Gateway engine""" 24 25 def __init__(self, 26 conf: json.Data, 27 eventer_client: hat.event.eventer.Client, 28 events_queue_size: int = 1024): 29 self._eventer_client = eventer_client 30 self._async_group = aio.Group() 31 self._events_queue = aio.Queue(events_queue_size) 32 self._devices = {} 33 34 for device_conf in conf['devices']: 35 info = common.import_device_info(device_conf['module']) 36 event_type_prefix = ('gateway', info.type, device_conf['name']) 37 38 self._devices[event_type_prefix] = _DeviceProxy( 39 conf=device_conf, 40 eventer_client=eventer_client, 41 event_type_prefix=event_type_prefix, 42 async_group=self.async_group, 43 create_device=info.create, 44 events_queue_size=events_queue_size) 45 46 self.async_group.spawn(self._run) 47 48 @property 49 def async_group(self) -> aio.Group: 50 """Async group""" 51 return self._async_group 52 53 async def process_events(self, events: Iterable[hat.event.common.Event]): 54 await self._events_queue.put(events) 55 56 async def _run(self): 57 try: 58 event_types = [(*event_type_prefix, 'system', 'enable') 59 for event_type_prefix in self._devices.keys()] 60 params = hat.event.common.QueryLatestParams(event_types) 61 result = await self._eventer_client.query(params) 62 63 for event in result.events: 64 event_type_prefix = event.type[:3] 65 device = self._devices.get(event_type_prefix) 66 if not device or event.type[3:] != ('system', 'enable'): 67 continue 68 69 device.set_enable( 70 isinstance(event.payload, 71 hat.event.common.EventPayloadJson) and 72 event.payload.data is True) 73 74 while True: 75 events = await self._events_queue.get() 76 77 device_events = collections.defaultdict(collections.deque) 78 79 for event in events: 80 event_type_prefix = event.type[:3] 81 device = self._devices.get(event_type_prefix) 82 if not device: 83 mlog.warning("received invalid event type prefix %s", 84 event_type_prefix) 85 continue 86 87 if event.type[3:] == ('system', 'enable'): 88 device.set_enable( 89 isinstance(event.payload, 90 hat.event.common.EventPayloadJson) and 91 event.payload.data is True) 92 93 else: 94 device_events[device].append(event) 95 96 for device, dev_events in device_events.items(): 97 await device.process_events(dev_events) 98 99 except Exception as e: 100 mlog.error("engine run error: %s", e, exc_info=e) 101 102 finally: 103 self.close() 104 self._events_queue.close()
Gateway engine
Engine( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
25 def __init__(self, 26 conf: json.Data, 27 eventer_client: hat.event.eventer.Client, 28 events_queue_size: int = 1024): 29 self._eventer_client = eventer_client 30 self._async_group = aio.Group() 31 self._events_queue = aio.Queue(events_queue_size) 32 self._devices = {} 33 34 for device_conf in conf['devices']: 35 info = common.import_device_info(device_conf['module']) 36 event_type_prefix = ('gateway', info.type, device_conf['name']) 37 38 self._devices[event_type_prefix] = _DeviceProxy( 39 conf=device_conf, 40 eventer_client=eventer_client, 41 event_type_prefix=event_type_prefix, 42 async_group=self.async_group, 43 create_device=info.create, 44 events_queue_size=events_queue_size) 45 46 self.async_group.spawn(self._run)