hat.gateway.engine
Gateway engine
1"""Gateway engine""" 2 3from collections.abc import Collection, Iterable 4import asyncio 5import collections 6import contextlib 7import importlib 8import logging 9 10from hat import aio 11from hat import json 12import hat.event.common 13import hat.event.eventer 14 15from hat.gateway import common 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21 22class Engine(aio.Resource): 23 """Gateway engine""" 24 25 def __init__(self, 26 conf: json.Data, 27 eventer_client: hat.event.eventer.Client, 28 events_queue_size: int = 1024): 29 self._eventer_client = eventer_client 30 self._async_group = aio.Group() 31 self._events_queue = aio.Queue(events_queue_size) 32 self._devices = {} 33 34 for device_conf in conf['devices']: 35 module = importlib.import_module(device_conf['module']) 36 event_type_prefix = ('gateway', 37 conf['gateway_name'], 38 module.info.type, 39 device_conf['name']) 40 41 self._devices[event_type_prefix] = _DeviceProxy( 42 conf=device_conf, 43 eventer_client=eventer_client, 44 event_type_prefix=event_type_prefix, 45 async_group=self.async_group, 46 create_device=module.info.create, 47 events_queue_size=events_queue_size) 48 49 self.async_group.spawn(self._run) 50 51 @property 52 def async_group(self) -> aio.Group: 53 """Async group""" 54 return self._async_group 55 56 async def process_events(self, events: Iterable[hat.event.common.Event]): 57 await self._events_queue.put(events) 58 59 async def _run(self): 60 try: 61 event_types = [(*event_type_prefix, 'system', 'enable') 62 for event_type_prefix in self._devices.keys()] 63 params = hat.event.common.QueryLatestParams(event_types) 64 result = await self._eventer_client.query(params) 65 66 for event in result.events: 67 event_type_prefix = event.type[:4] 68 device = self._devices.get(event_type_prefix) 69 if not device or event.type[4:] != ('system', 'enable'): 70 continue 71 72 device.set_enable( 73 isinstance(event.payload, 74 hat.event.common.EventPayloadJson) and 75 event.payload.data is True) 76 77 while True: 78 events = await self._events_queue.get() 79 80 device_events = collections.defaultdict(collections.deque) 81 82 for event in events: 83 event_type_prefix = event.type[:4] 84 device = self._devices.get(event_type_prefix) 85 if not device: 86 mlog.warning("received invalid event type prefix %s", 87 event_type_prefix) 88 continue 89 90 if event.type[4:] == ('system', 'enable'): 91 device.set_enable( 92 isinstance(event.payload, 93 hat.event.common.EventPayloadJson) and 94 event.payload.data is True) 95 96 else: 97 device_events[device].append(event) 98 99 for device, dev_events in device_events.items(): 100 await device.process_events(dev_events) 101 102 except Exception as e: 103 mlog.error("engine run error: %s", e, exc_info=e) 104 105 finally: 106 self.close() 107 self._events_queue.close() 108 109 110class _DeviceProxy(aio.Resource): 111 112 def __init__(self, 113 conf: json.Data, 114 eventer_client: hat.event.eventer.Client, 115 event_type_prefix: common.EventTypePrefix, 116 async_group: aio.Group, 117 create_device: common.CreateDevice, 118 events_queue_size: int = 1024): 119 self._conf = conf 120 self._eventer_client = eventer_client 121 self._event_type_prefix = event_type_prefix 122 self._async_group = async_group 123 self._create_device = create_device 124 self._events_queue_size = events_queue_size 125 self._events_queue = None 126 self._enable_event = asyncio.Event() 127 128 self.async_group.spawn(self._run) 129 130 @property 131 def async_group(self) -> aio.Group: 132 return self._async_group 133 134 def set_enable(self, enable: bool): 135 if enable: 136 if self._events_queue is not None: 137 return 138 139 self._events_queue = aio.Queue(self._events_queue_size) 140 self._enable_event.set() 141 142 else: 143 if self._events_queue is None: 144 return 145 146 self._events_queue.close() 147 self._events_queue = None 148 self._enable_event.set() 149 150 async def process_events(self, events: Collection[hat.event.common.Event]): 151 if self._events_queue is None: 152 mlog.warning("device not enabled - ignoring %s events", 153 len(events)) 154 return 155 156 await self._events_queue.put(events) 157 158 async def _run(self): 159 try: 160 while True: 161 self._enable_event.clear() 162 163 if self._events_queue is None: 164 await self._enable_event.wait() 165 continue 166 167 events_queue = self._events_queue 168 device = await aio.call(self._create_device, self._conf, 169 self._eventer_client, 170 self._event_type_prefix) 171 172 try: 173 device.async_group.spawn(aio.call_on_cancel, 174 events_queue.close) 175 await self._register_running(True) 176 177 while True: 178 events = await events_queue.get() 179 180 if not device.is_open: 181 raise Exception('device closed') 182 183 await aio.call(device.process_events, events) 184 185 except aio.QueueClosedError: 186 if not events_queue.is_closed: 187 raise 188 189 if not device.is_open: 190 raise Exception('device closed') 191 192 finally: 193 await aio.uncancellable(self._close_device(device)) 194 195 except Exception as e: 196 mlog.error("device proxy run error: %s", e, exc_info=e) 197 198 finally: 199 self.close() 200 201 async def _close_device(self, device): 202 await device.async_close() 203 204 with contextlib.suppress(ConnectionError): 205 await self._register_running(False) 206 207 async def _register_running(self, is_running): 208 await self._eventer_client.register([ 209 hat.event.common.RegisterEvent( 210 type=(*self._event_type_prefix, 'gateway', 'running'), 211 source_timestamp=hat.event.common.now(), 212 payload=hat.event.common.EventPayloadJson(is_running))])
Module logger
class
Engine(hat.aio.group.Resource):
23class Engine(aio.Resource): 24 """Gateway engine""" 25 26 def __init__(self, 27 conf: json.Data, 28 eventer_client: hat.event.eventer.Client, 29 events_queue_size: int = 1024): 30 self._eventer_client = eventer_client 31 self._async_group = aio.Group() 32 self._events_queue = aio.Queue(events_queue_size) 33 self._devices = {} 34 35 for device_conf in conf['devices']: 36 module = importlib.import_module(device_conf['module']) 37 event_type_prefix = ('gateway', 38 conf['gateway_name'], 39 module.info.type, 40 device_conf['name']) 41 42 self._devices[event_type_prefix] = _DeviceProxy( 43 conf=device_conf, 44 eventer_client=eventer_client, 45 event_type_prefix=event_type_prefix, 46 async_group=self.async_group, 47 create_device=module.info.create, 48 events_queue_size=events_queue_size) 49 50 self.async_group.spawn(self._run) 51 52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._async_group 56 57 async def process_events(self, events: Iterable[hat.event.common.Event]): 58 await self._events_queue.put(events) 59 60 async def _run(self): 61 try: 62 event_types = [(*event_type_prefix, 'system', 'enable') 63 for event_type_prefix in self._devices.keys()] 64 params = hat.event.common.QueryLatestParams(event_types) 65 result = await self._eventer_client.query(params) 66 67 for event in result.events: 68 event_type_prefix = event.type[:4] 69 device = self._devices.get(event_type_prefix) 70 if not device or event.type[4:] != ('system', 'enable'): 71 continue 72 73 device.set_enable( 74 isinstance(event.payload, 75 hat.event.common.EventPayloadJson) and 76 event.payload.data is True) 77 78 while True: 79 events = await self._events_queue.get() 80 81 device_events = collections.defaultdict(collections.deque) 82 83 for event in events: 84 event_type_prefix = event.type[:4] 85 device = self._devices.get(event_type_prefix) 86 if not device: 87 mlog.warning("received invalid event type prefix %s", 88 event_type_prefix) 89 continue 90 91 if event.type[4:] == ('system', 'enable'): 92 device.set_enable( 93 isinstance(event.payload, 94 hat.event.common.EventPayloadJson) and 95 event.payload.data is True) 96 97 else: 98 device_events[device].append(event) 99 100 for device, dev_events in device_events.items(): 101 await device.process_events(dev_events) 102 103 except Exception as e: 104 mlog.error("engine run error: %s", e, exc_info=e) 105 106 finally: 107 self.close() 108 self._events_queue.close()
Gateway engine
Engine( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, events_queue_size: int = 1024)
26 def __init__(self, 27 conf: json.Data, 28 eventer_client: hat.event.eventer.Client, 29 events_queue_size: int = 1024): 30 self._eventer_client = eventer_client 31 self._async_group = aio.Group() 32 self._events_queue = aio.Queue(events_queue_size) 33 self._devices = {} 34 35 for device_conf in conf['devices']: 36 module = importlib.import_module(device_conf['module']) 37 event_type_prefix = ('gateway', 38 conf['gateway_name'], 39 module.info.type, 40 device_conf['name']) 41 42 self._devices[event_type_prefix] = _DeviceProxy( 43 conf=device_conf, 44 eventer_client=eventer_client, 45 event_type_prefix=event_type_prefix, 46 async_group=self.async_group, 47 create_device=module.info.create, 48 events_queue_size=events_queue_size) 49 50 self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._async_group
Async group
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close