hat.gateway.engine
Gateway engine
1"""Gateway engine""" 2 3from collections.abc import Iterable 4import asyncio 5import contextlib 6import logging 7 8from hat import aio 9from hat import json 10import hat.event.common 11import hat.event.eventer 12 13from hat.gateway import common 14 15 16mlog: logging.Logger = logging.getLogger(__name__) 17"""Module logger""" 18 19 20class Engine(aio.Resource): 21 """Gateway engine""" 22 23 def __init__(self, 24 conf: json.Data, 25 eventer_client: hat.event.eventer.Client, 26 event_queue_size: int = 1024): 27 self._async_group = aio.Group() 28 29 self._devices = { 30 device.event_type_prefix: device 31 for device in (_DeviceProxy(conf=device_conf, 32 async_group=self.async_group, 33 eventer_client=eventer_client, 34 event_queue_size=event_queue_size) 35 for device_conf in conf['devices'])} 36 37 @property 38 def async_group(self) -> aio.Group: 39 """Async group""" 40 return self._async_group 41 42 async def process_events(self, events: Iterable[hat.event.common.Event]): 43 """Process received events""" 44 for event in events: 45 event_type_prefix = event.type[:3] 46 47 device = self._devices.get(event_type_prefix) 48 if not device: 49 mlog.debug("no device - ignorring event with type %s", 50 event.type) 51 continue 52 53 await device.process_event(event) 54 55 56class _DeviceProxy(aio.Resource): 57 58 def __init__(self, 59 conf: common.DeviceConf, 60 async_group: aio.Group, 61 eventer_client: hat.event.eventer.Client, 62 event_queue_size: int = 1024): 63 self._conf = conf 64 self._async_group = async_group 65 self._eventer_client = eventer_client 66 self._event_queue_size = event_queue_size 67 self._info = common.import_device_info(conf['module']) 68 self._event_type_prefix = ('gateway', self._info.type, conf['name']) 69 self._enabled = None 70 self._enabled_event = asyncio.Event() 71 self._event_queue = None 72 self._log = _create_device_proxy_logger_adapter(conf['name']) 73 74 self.async_group.spawn(self._device_loop) 75 76 @property 77 def async_group(self) -> aio.Group: 78 return self._async_group 79 80 @property 81 def event_type_prefix(self) -> hat.event.common.EventType: 82 return self._event_type_prefix 83 84 async def process_event(self, event: hat.event.common.Event): 85 if event.type[3:] == ('system', 'enable'): 86 self._process_enable_event(event) 87 return 88 89 if self._event_queue is not None: 90 with contextlib.suppress(aio.QueueClosedError): 91 await self._event_queue.put(event) 92 return 93 94 self._log.debug("device not running - ignoring event with type %s", 95 event.type) 96 97 def _process_enable_event(self, event): 98 self._enabled = ( 99 isinstance(event.payload, hat.event.common.EventPayloadJson) and 100 event.payload.data is True) 101 self._enabled_event.set() 102 103 async def _device_loop(self): 104 try: 105 result = await self._eventer_client.query( 106 hat.event.common.QueryLatestParams( 107 [(*self._event_type_prefix, 'system', 'enable')])) 108 109 if self._enabled is None: 110 for event in result.events: 111 self._process_enable_event(event) 112 113 while True: 114 while not self._enabled: 115 self._enabled_event.clear() 116 await self._enabled_event.wait() 117 118 if not self.is_open: 119 break 120 121 async with self.async_group.create_subgroup() as subgroup: 122 subgroup.spawn(self._enabled_loop) 123 124 while self._enabled: 125 self._enabled_event.clear() 126 await self._enabled_event.wait() 127 128 except Exception as e: 129 self._log.error("device loop error: %s", e, exc_info=e) 130 131 finally: 132 self.close() 133 134 async def _enabled_loop(self): 135 try: 136 device = await aio.call(self._info.create, 137 self._conf, 138 self._eventer_client, 139 self._event_type_prefix) 140 141 except Exception as e: 142 self._log.error("error creating device: %s", e, exc_info=e) 143 return 144 145 async def cleanup(): 146 await device.async_close() 147 148 with contextlib.suppress(ConnectionError): 149 await self._register_running(False) 150 151 try: 152 self._event_queue = aio.Queue(self._event_queue_size) 153 device.async_group.spawn(aio.call_on_cancel, 154 self._event_queue.close) 155 156 await self._register_running(True) 157 158 while True: 159 event = await self._event_queue.get() 160 161 await aio.call(device.process_event, event) 162 163 except aio.QueueClosedError: 164 pass 165 166 except Exception as e: 167 self._log.error("enabled loop error: %s", e, exc_info=e) 168 169 finally: 170 self._event_queue = None 171 await aio.uncancellable(cleanup()) 172 173 async def _register_running(self, is_running): 174 await self._eventer_client.register([ 175 hat.event.common.RegisterEvent( 176 type=(*self._event_type_prefix, 'gateway', 'running'), 177 source_timestamp=hat.event.common.now(), 178 payload=hat.event.common.EventPayloadJson(is_running))]) 179 180 181def _create_device_proxy_logger_adapter(name): 182 extra = {'meta': {'type': 'DeviceProxy', 183 'name': name}} 184 185 return logging.LoggerAdapter(mlog, extra)
Module logger
class
Engine(hat.aio.group.Resource):
21class Engine(aio.Resource): 22 """Gateway engine""" 23 24 def __init__(self, 25 conf: json.Data, 26 eventer_client: hat.event.eventer.Client, 27 event_queue_size: int = 1024): 28 self._async_group = aio.Group() 29 30 self._devices = { 31 device.event_type_prefix: device 32 for device in (_DeviceProxy(conf=device_conf, 33 async_group=self.async_group, 34 eventer_client=eventer_client, 35 event_queue_size=event_queue_size) 36 for device_conf in conf['devices'])} 37 38 @property 39 def async_group(self) -> aio.Group: 40 """Async group""" 41 return self._async_group 42 43 async def process_events(self, events: Iterable[hat.event.common.Event]): 44 """Process received events""" 45 for event in events: 46 event_type_prefix = event.type[:3] 47 48 device = self._devices.get(event_type_prefix) 49 if not device: 50 mlog.debug("no device - ignorring event with type %s", 51 event.type) 52 continue 53 54 await device.process_event(event)
Gateway engine
Engine( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_queue_size: int = 1024)
24 def __init__(self, 25 conf: json.Data, 26 eventer_client: hat.event.eventer.Client, 27 event_queue_size: int = 1024): 28 self._async_group = aio.Group() 29 30 self._devices = { 31 device.event_type_prefix: device 32 for device in (_DeviceProxy(conf=device_conf, 33 async_group=self.async_group, 34 eventer_client=eventer_client, 35 event_queue_size=event_queue_size) 36 for device_conf in conf['devices'])}
async_group: hat.aio.group.Group
38 @property 39 def async_group(self) -> aio.Group: 40 """Async group""" 41 return self._async_group
Async group
async def
process_events(self, events: Iterable[hat.event.common.common.Event]):
43 async def process_events(self, events: Iterable[hat.event.common.Event]): 44 """Process received events""" 45 for event in events: 46 event_type_prefix = event.type[:3] 47 48 device = self._devices.get(event_type_prefix) 49 if not device: 50 mlog.debug("no device - ignorring event with type %s", 51 event.type) 52 continue 53 54 await device.process_event(event)
Process received events