hat.gateway.engine
Gateway engine
1"""Gateway engine""" 2 3from collections.abc import Collection, Iterable 4import asyncio 5import collections 6import contextlib 7import logging 8 9from hat import aio 10from hat import json 11import hat.event.common 12import hat.event.eventer 13 14from hat.gateway import common 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20 21class Engine(aio.Resource): 22 """Gateway engine""" 23 24 def __init__(self, 25 conf: json.Data, 26 eventer_client: hat.event.eventer.Client, 27 events_queue_size: int = 1024): 28 self._eventer_client = eventer_client 29 self._async_group = aio.Group() 30 self._events_queue = aio.Queue(events_queue_size) 31 self._devices = {} 32 33 for device_conf in conf['devices']: 34 info = common.import_device_info(device_conf['module']) 35 event_type_prefix = ('gateway', info.type, device_conf['name']) 36 37 self._devices[event_type_prefix] = _DeviceProxy( 38 conf=device_conf, 39 eventer_client=eventer_client, 40 event_type_prefix=event_type_prefix, 41 async_group=self.async_group, 42 create_device=info.create, 43 events_queue_size=events_queue_size) 44 45 self.async_group.spawn(self._run) 46 47 @property 48 def async_group(self) -> aio.Group: 49 """Async group""" 50 return self._async_group 51 52 async def process_events(self, events: Iterable[hat.event.common.Event]): 53 await self._events_queue.put(events) 54 55 async def _run(self): 56 try: 57 event_types = [(*event_type_prefix, 'system', 'enable') 58 for event_type_prefix in self._devices.keys()] 59 params = hat.event.common.QueryLatestParams(event_types) 60 result = await self._eventer_client.query(params) 61 62 for event in result.events: 63 event_type_prefix = event.type[:3] 64 device = self._devices.get(event_type_prefix) 65 if not device or event.type[3:] != ('system', 'enable'): 66 continue 67 68 device.set_enable( 69 isinstance(event.payload, 70 hat.event.common.EventPayloadJson) and 71 event.payload.data is True) 72 73 while True: 74 events = await self._events_queue.get() 75 76 device_events = collections.defaultdict(collections.deque) 77 78 for event in events: 79 event_type_prefix = event.type[:3] 80 device = self._devices.get(event_type_prefix) 81 if not device: 82 mlog.warning("received invalid event type prefix %s", 83 event_type_prefix) 84 continue 85 86 if event.type[3:] == ('system', 'enable'): 87 device.set_enable( 88 isinstance(event.payload, 89 hat.event.common.EventPayloadJson) and 90 event.payload.data is True) 91 92 else: 93 device_events[device].append(event) 94 95 for device, dev_events in device_events.items(): 96 await device.process_events(dev_events) 97 98 except Exception as e: 99 mlog.error("engine run error: %s", e, exc_info=e) 100 101 finally: 102 self.close() 103 self._events_queue.close() 104 105 106class _DeviceProxy(aio.Resource): 107 108 def __init__(self, 109 conf: json.Data, 110 eventer_client: hat.event.eventer.Client, 111 event_type_prefix: common.EventTypePrefix, 112 async_group: aio.Group, 113 create_device: common.CreateDevice, 114 events_queue_size: int = 1024): 115 self._conf = conf 116 self._eventer_client = eventer_client 117 self._event_type_prefix = event_type_prefix 118 self._async_group = async_group 119 self._create_device = create_device 120 self._events_queue_size = events_queue_size 121 self._events_queue = None 122 self._enable_event = asyncio.Event() 123 124 self.async_group.spawn(self._run) 125 126 @property 127 def async_group(self) -> aio.Group: 128 return self._async_group 129 130 def set_enable(self, enable: bool): 131 if enable: 132 if self._events_queue is not None: 133 return 134 135 self._events_queue = aio.Queue(self._events_queue_size) 136 self._enable_event.set() 137 138 else: 139 if self._events_queue is None: 140 return 141 142 self._events_queue.close() 143 self._events_queue = None 144 self._enable_event.set() 145 146 async def process_events(self, events: Collection[hat.event.common.Event]): 147 if self._events_queue is None: 148 mlog.warning("device not enabled - ignoring %s events", 149 len(events)) 150 return 151 152 await self._events_queue.put(events) 153 154 async def _run(self): 155 try: 156 while True: 157 self._enable_event.clear() 158 159 if self._events_queue is None: 160 await self._enable_event.wait() 161 continue 162 163 events_queue = self._events_queue 164 device = await aio.call(self._create_device, self._conf, 165 self._eventer_client, 166 self._event_type_prefix) 167 168 try: 169 device.async_group.spawn(aio.call_on_cancel, 170 events_queue.close) 171 await self._register_running(True) 172 173 while True: 174 events = await events_queue.get() 175 176 if not device.is_open: 177 raise Exception('device closed') 178 179 await aio.call(device.process_events, events) 180 181 except aio.QueueClosedError: 182 if not events_queue.is_closed: 183 raise 184 185 if not device.is_open: 186 raise Exception('device closed') 187 188 finally: 189 await aio.uncancellable(self._close_device(device)) 190 191 except Exception as e: 192 mlog.error("device proxy run error: %s", e, exc_info=e) 193 194 finally: 195 self.close() 196 197 async def _close_device(self, device): 198 await device.async_close() 199 200 with contextlib.suppress(ConnectionError): 201 await self._register_running(False) 202 203 async def _register_running(self, is_running): 204 await self._eventer_client.register([ 205 hat.event.common.RegisterEvent( 206 type=(*self._event_type_prefix, 'gateway', 'running'), 207 source_timestamp=hat.event.common.now(), 208 payload=hat.event.common.EventPayloadJson(is_running))])
Module logger
class
Engine(hat.aio.group.Resource):
22class Engine(aio.Resource): 23 """Gateway engine""" 24 25 def __init__(self, 26 conf: json.Data, 27 eventer_client: hat.event.eventer.Client, 28 events_queue_size: int = 1024): 29 self._eventer_client = eventer_client 30 self._async_group = aio.Group() 31 self._events_queue = aio.Queue(events_queue_size) 32 self._devices = {} 33 34 for device_conf in conf['devices']: 35 info = common.import_device_info(device_conf['module']) 36 event_type_prefix = ('gateway', info.type, device_conf['name']) 37 38 self._devices[event_type_prefix] = _DeviceProxy( 39 conf=device_conf, 40 eventer_client=eventer_client, 41 event_type_prefix=event_type_prefix, 42 async_group=self.async_group, 43 create_device=info.create, 44 events_queue_size=events_queue_size) 45 46 self.async_group.spawn(self._run) 47 48 @property 49 def async_group(self) -> aio.Group: 50 """Async group""" 51 return self._async_group 52 53 async def process_events(self, events: Iterable[hat.event.common.Event]): 54 await self._events_queue.put(events) 55 56 async def _run(self): 57 try: 58 event_types = [(*event_type_prefix, 'system', 'enable') 59 for event_type_prefix in self._devices.keys()] 60 params = hat.event.common.QueryLatestParams(event_types) 61 result = await self._eventer_client.query(params) 62 63 for event in result.events: 64 event_type_prefix = event.type[:3] 65 device = self._devices.get(event_type_prefix) 66 if not device or event.type[3:] != ('system', 'enable'): 67 continue 68 69 device.set_enable( 70 isinstance(event.payload, 71 hat.event.common.EventPayloadJson) and 72 event.payload.data is True) 73 74 while True: 75 events = await self._events_queue.get() 76 77 device_events = collections.defaultdict(collections.deque) 78 79 for event in events: 80 event_type_prefix = event.type[:3] 81 device = self._devices.get(event_type_prefix) 82 if not device: 83 mlog.warning("received invalid event type prefix %s", 84 event_type_prefix) 85 continue 86 87 if event.type[3:] == ('system', 'enable'): 88 device.set_enable( 89 isinstance(event.payload, 90 hat.event.common.EventPayloadJson) and 91 event.payload.data is True) 92 93 else: 94 device_events[device].append(event) 95 96 for device, dev_events in device_events.items(): 97 await device.process_events(dev_events) 98 99 except Exception as e: 100 mlog.error("engine run error: %s", e, exc_info=e) 101 102 finally: 103 self.close() 104 self._events_queue.close()
Gateway engine
Engine( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
25 def __init__(self, 26 conf: json.Data, 27 eventer_client: hat.event.eventer.Client, 28 events_queue_size: int = 1024): 29 self._eventer_client = eventer_client 30 self._async_group = aio.Group() 31 self._events_queue = aio.Queue(events_queue_size) 32 self._devices = {} 33 34 for device_conf in conf['devices']: 35 info = common.import_device_info(device_conf['module']) 36 event_type_prefix = ('gateway', info.type, device_conf['name']) 37 38 self._devices[event_type_prefix] = _DeviceProxy( 39 conf=device_conf, 40 eventer_client=eventer_client, 41 event_type_prefix=event_type_prefix, 42 async_group=self.async_group, 43 create_device=info.create, 44 events_queue_size=events_queue_size) 45 46 self.async_group.spawn(self._run)