hat.gateway.runner
1from collections.abc import Iterable 2import asyncio 3import collections 4import logging 5 6from hat import aio 7from hat import json 8from hat.drivers import tcp 9import hat.event.component 10import hat.event.eventer 11 12from hat.gateway import common 13from hat.gateway.adminer_server import create_adminer_server 14import hat.gateway.engine 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20 21class MainRunner(aio.Resource): 22 23 def __init__(self, conf: json.Data): 24 self._conf = conf 25 self._loop = asyncio.get_running_loop() 26 self._async_group = aio.Group() 27 self._eventer_component = None 28 self._eventer_client = None 29 self._eventer_runner = None 30 self._adminer_server = None 31 32 self.async_group.spawn(self._run) 33 34 @property 35 def async_group(self) -> aio.Group: 36 return self._async_group 37 38 async def _run(self): 39 try: 40 mlog.debug("starting main runner loop") 41 await self._start() 42 43 await self._loop.create_future() 44 45 except Exception as e: 46 mlog.error("main runner loop error: %s", e, exc_info=e) 47 48 finally: 49 mlog.debug("closing main runner loop") 50 self.close() 51 await aio.uncancellable(self._stop()) 52 53 async def _start(self): 54 event_server_conf = self._conf['event_server'] 55 56 subscriptions = collections.deque() 57 for device_conf in self._conf['devices']: 58 info = common.import_device_info(device_conf['module']) 59 subscriptions.append(('gateway', info.type, device_conf['name'], 60 'system', '*')) 61 62 if 'monitor_component' in event_server_conf: 63 monitor_component_conf = event_server_conf['monitor_component'] 64 65 mlog.debug("creating eventer component") 66 self._eventer_component = await hat.event.component.connect( 67 addr=tcp.Address(monitor_component_conf['host'], 68 monitor_component_conf['port']), 69 name=self._conf['name'], 70 group=monitor_component_conf['gateway_group'], 71 server_group=monitor_component_conf['event_server_group'], 72 client_name=f"gateway/{self._conf['name']}", 73 runner_cb=self._create_eventer_runner, 74 status_cb=self._on_component_status, 75 events_cb=self._on_component_events, 76 eventer_kwargs={'subscriptions': subscriptions}) 77 _bind_resource(self.async_group, self._eventer_component) 78 79 await self._eventer_component.set_ready(True) 80 81 elif 'eventer_server' in event_server_conf: 82 eventer_server_conf = event_server_conf['eventer_server'] 83 84 mlog.debug("creating eventer client") 85 self._eventer_client = await hat.event.eventer.connect( 86 addr=tcp.Address(eventer_server_conf['host'], 87 eventer_server_conf['port']), 88 client_name=f"gateway/{self._conf['name']}", 89 subscriptions=subscriptions, 90 status_cb=self._on_client_status, 91 events_cb=self._on_client_events) 92 _bind_resource(self.async_group, self._eventer_client) 93 94 mlog.debug("creating eventer runner") 95 self._eventer_runner = EventerRunner( 96 conf=self._conf, 97 eventer_client=self._eventer_client) 98 _bind_resource(self.async_group, self._eventer_runner) 99 100 else: 101 raise Exception('invalid configuration') 102 103 if 'adminer_server' in self._conf: 104 mlog.debug("creating adminer server") 105 self._adminer_server = await create_adminer_server( 106 addr=tcp.Address(self._conf['adminer_server']['host'], 107 self._conf['adminer_server']['port']), 108 log_conf=self._conf.get('log')) 109 _bind_resource(self.async_group, self._adminer_server) 110 111 async def _stop(self): 112 if self._adminer_server: 113 await self._adminer_server.async_close() 114 115 if self._eventer_runner and not self._eventer_component: 116 await self._eventer_runner.async_close() 117 118 if self._eventer_client: 119 await self._eventer_client.async_close() 120 121 if self._eventer_component: 122 await self._eventer_component.async_close() 123 124 async def _create_eventer_runner(self, monitor_component, server_data, 125 eventer_client): 126 mlog.debug("creating eventer runner") 127 self._eventer_runner = EventerRunner(conf=self._conf, 128 eventer_client=eventer_client) 129 130 return self._eventer_runner 131 132 def _process_status(self, status): 133 if not self._eventer_runner: 134 return 135 136 self._eventer_runner.process_status(status) 137 138 async def _process_events(self, events): 139 if not self._eventer_runner: 140 return 141 142 await self._eventer_runner.process_events(events) 143 144 def _on_component_status(self, eventer_component, eventer_client, status): 145 self._process_status(status) 146 147 def _on_client_status(self, eventer_client, status): 148 self._process_status(status) 149 150 async def _on_component_events(self, eventer_component, eventer_client, 151 events): 152 await self._process_events(events) 153 154 async def _on_client_events(self, eventer_client, events): 155 await self._process_events(events) 156 157 158class EventerRunner(aio.Resource): 159 160 def __init__(self, 161 conf: json.Data, 162 eventer_client: hat.event.eventer.Client): 163 self._conf = conf 164 self._eventer_client = eventer_client 165 self._engine = None 166 self._status_event = asyncio.Event() 167 self._async_group = aio.Group() 168 169 self._status_event.set() 170 self.async_group.spawn(self._run) 171 172 @property 173 def async_group(self) -> aio.Group: 174 return self._async_group 175 176 def process_status(self, status: hat.event.common.Status): 177 self._status_event.set() 178 179 async def process_events(self, events: Iterable[hat.event.common.Event]): 180 if not self.is_open or not self._engine or not self._engine.is_open: 181 return 182 183 await self._engine.process_events(events) 184 185 async def _run(self): 186 try: 187 mlog.debug("starting eventer runner loop") 188 while True: 189 await self._status_event.wait() 190 191 self._status_event.clear() 192 if not self._is_active(): 193 continue 194 195 mlog.debug("creating engine") 196 self._engine = hat.gateway.engine.Engine( 197 conf=self._conf, 198 eventer_client=self._eventer_client) 199 200 async with self.async_group.create_subgroup() as subgroup: 201 engine_closing_task = subgroup.spawn( 202 self._engine.wait_closing) 203 204 while True: 205 if engine_closing_task.done(): 206 return 207 208 self._status_event.clear() 209 if not self._is_active(): 210 break 211 212 status_task = subgroup.spawn(self._status_event.wait) 213 214 await asyncio.wait( 215 [engine_closing_task, status_task], 216 return_when=asyncio.FIRST_COMPLETED) 217 218 await self._engine.async_close() 219 220 except Exception as e: 221 mlog.error("eventer runner loop error: %s", e, exc_info=e) 222 223 finally: 224 mlog.debug("closing eventer runner loop") 225 self.close() 226 227 if self._engine: 228 await aio.uncancellable(self._engine.async_close()) 229 230 def _is_active(self): 231 if not self._eventer_client.is_open: 232 return False 233 234 if self._conf['event_server'].get('require_operational'): 235 return self._eventer_client.status == hat.event.common.Status.OPERATIONAL # NOQA 236 237 return True 238 239 240def _bind_resource(async_group, resource): 241 async_group.spawn(aio.call_on_done, resource.wait_closing(), 242 async_group.close)
Module logger
class
MainRunner(hat.aio.group.Resource):
22class MainRunner(aio.Resource): 23 24 def __init__(self, conf: json.Data): 25 self._conf = conf 26 self._loop = asyncio.get_running_loop() 27 self._async_group = aio.Group() 28 self._eventer_component = None 29 self._eventer_client = None 30 self._eventer_runner = None 31 self._adminer_server = None 32 33 self.async_group.spawn(self._run) 34 35 @property 36 def async_group(self) -> aio.Group: 37 return self._async_group 38 39 async def _run(self): 40 try: 41 mlog.debug("starting main runner loop") 42 await self._start() 43 44 await self._loop.create_future() 45 46 except Exception as e: 47 mlog.error("main runner loop error: %s", e, exc_info=e) 48 49 finally: 50 mlog.debug("closing main runner loop") 51 self.close() 52 await aio.uncancellable(self._stop()) 53 54 async def _start(self): 55 event_server_conf = self._conf['event_server'] 56 57 subscriptions = collections.deque() 58 for device_conf in self._conf['devices']: 59 info = common.import_device_info(device_conf['module']) 60 subscriptions.append(('gateway', info.type, device_conf['name'], 61 'system', '*')) 62 63 if 'monitor_component' in event_server_conf: 64 monitor_component_conf = event_server_conf['monitor_component'] 65 66 mlog.debug("creating eventer component") 67 self._eventer_component = await hat.event.component.connect( 68 addr=tcp.Address(monitor_component_conf['host'], 69 monitor_component_conf['port']), 70 name=self._conf['name'], 71 group=monitor_component_conf['gateway_group'], 72 server_group=monitor_component_conf['event_server_group'], 73 client_name=f"gateway/{self._conf['name']}", 74 runner_cb=self._create_eventer_runner, 75 status_cb=self._on_component_status, 76 events_cb=self._on_component_events, 77 eventer_kwargs={'subscriptions': subscriptions}) 78 _bind_resource(self.async_group, self._eventer_component) 79 80 await self._eventer_component.set_ready(True) 81 82 elif 'eventer_server' in event_server_conf: 83 eventer_server_conf = event_server_conf['eventer_server'] 84 85 mlog.debug("creating eventer client") 86 self._eventer_client = await hat.event.eventer.connect( 87 addr=tcp.Address(eventer_server_conf['host'], 88 eventer_server_conf['port']), 89 client_name=f"gateway/{self._conf['name']}", 90 subscriptions=subscriptions, 91 status_cb=self._on_client_status, 92 events_cb=self._on_client_events) 93 _bind_resource(self.async_group, self._eventer_client) 94 95 mlog.debug("creating eventer runner") 96 self._eventer_runner = EventerRunner( 97 conf=self._conf, 98 eventer_client=self._eventer_client) 99 _bind_resource(self.async_group, self._eventer_runner) 100 101 else: 102 raise Exception('invalid configuration') 103 104 if 'adminer_server' in self._conf: 105 mlog.debug("creating adminer server") 106 self._adminer_server = await create_adminer_server( 107 addr=tcp.Address(self._conf['adminer_server']['host'], 108 self._conf['adminer_server']['port']), 109 log_conf=self._conf.get('log')) 110 _bind_resource(self.async_group, self._adminer_server) 111 112 async def _stop(self): 113 if self._adminer_server: 114 await self._adminer_server.async_close() 115 116 if self._eventer_runner and not self._eventer_component: 117 await self._eventer_runner.async_close() 118 119 if self._eventer_client: 120 await self._eventer_client.async_close() 121 122 if self._eventer_component: 123 await self._eventer_component.async_close() 124 125 async def _create_eventer_runner(self, monitor_component, server_data, 126 eventer_client): 127 mlog.debug("creating eventer runner") 128 self._eventer_runner = EventerRunner(conf=self._conf, 129 eventer_client=eventer_client) 130 131 return self._eventer_runner 132 133 def _process_status(self, status): 134 if not self._eventer_runner: 135 return 136 137 self._eventer_runner.process_status(status) 138 139 async def _process_events(self, events): 140 if not self._eventer_runner: 141 return 142 143 await self._eventer_runner.process_events(events) 144 145 def _on_component_status(self, eventer_component, eventer_client, status): 146 self._process_status(status) 147 148 def _on_client_status(self, eventer_client, status): 149 self._process_status(status) 150 151 async def _on_component_events(self, eventer_component, eventer_client, 152 events): 153 await self._process_events(events) 154 155 async def _on_client_events(self, eventer_client, events): 156 await self._process_events(events)
Resource with lifetime control based on Group
.
MainRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])
24 def __init__(self, conf: json.Data): 25 self._conf = conf 26 self._loop = asyncio.get_running_loop() 27 self._async_group = aio.Group() 28 self._eventer_component = None 29 self._eventer_client = None 30 self._eventer_runner = None 31 self._adminer_server = None 32 33 self.async_group.spawn(self._run)
class
EventerRunner(hat.aio.group.Resource):
159class EventerRunner(aio.Resource): 160 161 def __init__(self, 162 conf: json.Data, 163 eventer_client: hat.event.eventer.Client): 164 self._conf = conf 165 self._eventer_client = eventer_client 166 self._engine = None 167 self._status_event = asyncio.Event() 168 self._async_group = aio.Group() 169 170 self._status_event.set() 171 self.async_group.spawn(self._run) 172 173 @property 174 def async_group(self) -> aio.Group: 175 return self._async_group 176 177 def process_status(self, status: hat.event.common.Status): 178 self._status_event.set() 179 180 async def process_events(self, events: Iterable[hat.event.common.Event]): 181 if not self.is_open or not self._engine or not self._engine.is_open: 182 return 183 184 await self._engine.process_events(events) 185 186 async def _run(self): 187 try: 188 mlog.debug("starting eventer runner loop") 189 while True: 190 await self._status_event.wait() 191 192 self._status_event.clear() 193 if not self._is_active(): 194 continue 195 196 mlog.debug("creating engine") 197 self._engine = hat.gateway.engine.Engine( 198 conf=self._conf, 199 eventer_client=self._eventer_client) 200 201 async with self.async_group.create_subgroup() as subgroup: 202 engine_closing_task = subgroup.spawn( 203 self._engine.wait_closing) 204 205 while True: 206 if engine_closing_task.done(): 207 return 208 209 self._status_event.clear() 210 if not self._is_active(): 211 break 212 213 status_task = subgroup.spawn(self._status_event.wait) 214 215 await asyncio.wait( 216 [engine_closing_task, status_task], 217 return_when=asyncio.FIRST_COMPLETED) 218 219 await self._engine.async_close() 220 221 except Exception as e: 222 mlog.error("eventer runner loop error: %s", e, exc_info=e) 223 224 finally: 225 mlog.debug("closing eventer runner loop") 226 self.close() 227 228 if self._engine: 229 await aio.uncancellable(self._engine.async_close()) 230 231 def _is_active(self): 232 if not self._eventer_client.is_open: 233 return False 234 235 if self._conf['event_server'].get('require_operational'): 236 return self._eventer_client.status == hat.event.common.Status.OPERATIONAL # NOQA 237 238 return True
Resource with lifetime control based on Group
.
EventerRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client)
161 def __init__(self, 162 conf: json.Data, 163 eventer_client: hat.event.eventer.Client): 164 self._conf = conf 165 self._eventer_client = eventer_client 166 self._engine = None 167 self._status_event = asyncio.Event() 168 self._async_group = aio.Group() 169 170 self._status_event.set() 171 self.async_group.spawn(self._run)