hat.gateway.runner
1from collections.abc import Iterable 2import asyncio 3import collections 4import logging 5 6from hat import aio 7from hat import json 8from hat.drivers import tcp 9import hat.event.component 10import hat.event.eventer 11 12from hat.gateway import common 13import hat.gateway.engine 14 15 16mlog: logging.Logger = logging.getLogger(__name__) 17"""Module logger""" 18 19 20class MainRunner(aio.Resource): 21 22 def __init__(self, conf: json.Data): 23 self._conf = conf 24 self._loop = asyncio.get_running_loop() 25 self._async_group = aio.Group() 26 self._eventer_component = None 27 self._eventer_client = None 28 self._eventer_runner = None 29 30 self.async_group.spawn(self._run) 31 32 @property 33 def async_group(self) -> aio.Group: 34 return self._async_group 35 36 async def _run(self): 37 try: 38 mlog.debug("starting main runner loop") 39 await self._start() 40 41 await self._loop.create_future() 42 43 except Exception as e: 44 mlog.error("main runner loop error: %s", e, exc_info=e) 45 46 finally: 47 mlog.debug("closing main runner loop") 48 self.close() 49 await aio.uncancellable(self._stop()) 50 51 async def _start(self): 52 event_server_conf = self._conf['event_server'] 53 54 subscriptions = collections.deque() 55 for device_conf in self._conf['devices']: 56 info = common.import_device_info(device_conf['module']) 57 subscriptions.append(('gateway', info.type, device_conf['name'], 58 'system', '*')) 59 60 if 'monitor_component' in event_server_conf: 61 monitor_component_conf = event_server_conf['monitor_component'] 62 63 mlog.debug("creating eventer component") 64 self._eventer_component = await hat.event.component.connect( 65 addr=tcp.Address(monitor_component_conf['host'], 66 monitor_component_conf['port']), 67 name=self._conf['name'], 68 group=monitor_component_conf['gateway_group'], 69 server_group=monitor_component_conf['event_server_group'], 70 client_name=f"gateway/{self._conf['name']}", 71 runner_cb=self._create_eventer_runner, 72 status_cb=self._on_component_status, 73 events_cb=self._on_component_events, 74 eventer_kwargs={'subscriptions': subscriptions}) 75 _bind_resource(self.async_group, self._eventer_component) 76 77 await self._eventer_component.set_ready(True) 78 79 elif 'eventer_server' in event_server_conf: 80 eventer_server_conf = event_server_conf['eventer_server'] 81 82 mlog.debug("creating eventer client") 83 self._eventer_client = await hat.event.eventer.connect( 84 addr=tcp.Address(eventer_server_conf['host'], 85 eventer_server_conf['port']), 86 client_name=f"gateway/{self._conf['name']}", 87 subscriptions=subscriptions, 88 status_cb=self._on_client_status, 89 events_cb=self._on_client_events) 90 _bind_resource(self.async_group, self._eventer_client) 91 92 mlog.debug("creating eventer runner") 93 self._eventer_runner = EventerRunner( 94 conf=self._conf, 95 eventer_client=self._eventer_client) 96 _bind_resource(self.async_group, self._eventer_runner) 97 98 else: 99 raise Exception('invalid configuration') 100 101 async def _stop(self): 102 if self._eventer_runner and not self._eventer_component: 103 await self._eventer_runner.async_close() 104 105 if self._eventer_client: 106 await self._eventer_client.async_close() 107 108 if self._eventer_component: 109 await self._eventer_component.async_close() 110 111 async def _create_eventer_runner(self, monitor_component, server_data, 112 eventer_client): 113 mlog.debug("creating eventer runner") 114 self._eventer_runner = EventerRunner(conf=self._conf, 115 eventer_client=eventer_client) 116 117 return self._eventer_runner 118 119 def _process_status(self, status): 120 if not self._eventer_runner: 121 return 122 123 self._eventer_runner.process_status(status) 124 125 async def _process_events(self, events): 126 if not self._eventer_runner: 127 return 128 129 await self._eventer_runner.process_events(events) 130 131 def _on_component_status(self, eventer_component, eventer_client, status): 132 self._process_status(status) 133 134 def _on_client_status(self, eventer_client, status): 135 self._process_status(status) 136 137 async def _on_component_events(self, eventer_component, eventer_client, 138 events): 139 await self._process_events(events) 140 141 async def _on_client_events(self, eventer_client, events): 142 await self._process_events(events) 143 144 145class EventerRunner(aio.Resource): 146 147 def __init__(self, 148 conf: json.Data, 149 eventer_client: hat.event.eventer.Client): 150 self._conf = conf 151 self._eventer_client = eventer_client 152 self._engine = None 153 self._status_event = asyncio.Event() 154 self._async_group = aio.Group() 155 156 self._status_event.set() 157 self.async_group.spawn(self._run) 158 159 @property 160 def async_group(self) -> aio.Group: 161 return self._async_group 162 163 def process_status(self, status: hat.event.common.Status): 164 self._status_event.set() 165 166 async def process_events(self, events: Iterable[hat.event.common.Event]): 167 if not self.is_open or not self._engine or not self._engine.is_open: 168 return 169 170 await self._engine.process_events(events) 171 172 async def _run(self): 173 try: 174 mlog.debug("starting eventer runner loop") 175 while True: 176 await self._status_event.wait() 177 178 self._status_event.clear() 179 if not self._is_active(): 180 continue 181 182 mlog.debug("creating engine") 183 self._engine = hat.gateway.engine.Engine( 184 conf=self._conf, 185 eventer_client=self._eventer_client) 186 187 async with self.async_group.create_subgroup() as subgroup: 188 engine_closing_task = subgroup.spawn( 189 self._engine.wait_closing) 190 191 while True: 192 if engine_closing_task.done(): 193 return 194 195 self._status_event.clear() 196 if not self._is_active(): 197 break 198 199 status_task = subgroup.spawn(self._status_event.wait) 200 201 await asyncio.wait( 202 [engine_closing_task, status_task], 203 return_when=asyncio.FIRST_COMPLETED) 204 205 await self._engine.async_close() 206 207 except Exception as e: 208 mlog.error("eventer runner loop error: %s", e, exc_info=e) 209 210 finally: 211 mlog.debug("closing eventer runner loop") 212 self.close() 213 214 if self._engine: 215 await aio.uncancellable(self._engine.async_close()) 216 217 def _is_active(self): 218 if not self._eventer_client.is_open: 219 return False 220 221 if self._conf['event_server'].get('require_operational'): 222 return self._eventer_client.status == hat.event.common.Status.OPERATIONAL # NOQA 223 224 return True 225 226 227def _bind_resource(async_group, resource): 228 async_group.spawn(aio.call_on_done, resource.wait_closing(), 229 async_group.close)
Module logger
class
MainRunner(hat.aio.group.Resource):
21class MainRunner(aio.Resource): 22 23 def __init__(self, conf: json.Data): 24 self._conf = conf 25 self._loop = asyncio.get_running_loop() 26 self._async_group = aio.Group() 27 self._eventer_component = None 28 self._eventer_client = None 29 self._eventer_runner = None 30 31 self.async_group.spawn(self._run) 32 33 @property 34 def async_group(self) -> aio.Group: 35 return self._async_group 36 37 async def _run(self): 38 try: 39 mlog.debug("starting main runner loop") 40 await self._start() 41 42 await self._loop.create_future() 43 44 except Exception as e: 45 mlog.error("main runner loop error: %s", e, exc_info=e) 46 47 finally: 48 mlog.debug("closing main runner loop") 49 self.close() 50 await aio.uncancellable(self._stop()) 51 52 async def _start(self): 53 event_server_conf = self._conf['event_server'] 54 55 subscriptions = collections.deque() 56 for device_conf in self._conf['devices']: 57 info = common.import_device_info(device_conf['module']) 58 subscriptions.append(('gateway', info.type, device_conf['name'], 59 'system', '*')) 60 61 if 'monitor_component' in event_server_conf: 62 monitor_component_conf = event_server_conf['monitor_component'] 63 64 mlog.debug("creating eventer component") 65 self._eventer_component = await hat.event.component.connect( 66 addr=tcp.Address(monitor_component_conf['host'], 67 monitor_component_conf['port']), 68 name=self._conf['name'], 69 group=monitor_component_conf['gateway_group'], 70 server_group=monitor_component_conf['event_server_group'], 71 client_name=f"gateway/{self._conf['name']}", 72 runner_cb=self._create_eventer_runner, 73 status_cb=self._on_component_status, 74 events_cb=self._on_component_events, 75 eventer_kwargs={'subscriptions': subscriptions}) 76 _bind_resource(self.async_group, self._eventer_component) 77 78 await self._eventer_component.set_ready(True) 79 80 elif 'eventer_server' in event_server_conf: 81 eventer_server_conf = event_server_conf['eventer_server'] 82 83 mlog.debug("creating eventer client") 84 self._eventer_client = await hat.event.eventer.connect( 85 addr=tcp.Address(eventer_server_conf['host'], 86 eventer_server_conf['port']), 87 client_name=f"gateway/{self._conf['name']}", 88 subscriptions=subscriptions, 89 status_cb=self._on_client_status, 90 events_cb=self._on_client_events) 91 _bind_resource(self.async_group, self._eventer_client) 92 93 mlog.debug("creating eventer runner") 94 self._eventer_runner = EventerRunner( 95 conf=self._conf, 96 eventer_client=self._eventer_client) 97 _bind_resource(self.async_group, self._eventer_runner) 98 99 else: 100 raise Exception('invalid configuration') 101 102 async def _stop(self): 103 if self._eventer_runner and not self._eventer_component: 104 await self._eventer_runner.async_close() 105 106 if self._eventer_client: 107 await self._eventer_client.async_close() 108 109 if self._eventer_component: 110 await self._eventer_component.async_close() 111 112 async def _create_eventer_runner(self, monitor_component, server_data, 113 eventer_client): 114 mlog.debug("creating eventer runner") 115 self._eventer_runner = EventerRunner(conf=self._conf, 116 eventer_client=eventer_client) 117 118 return self._eventer_runner 119 120 def _process_status(self, status): 121 if not self._eventer_runner: 122 return 123 124 self._eventer_runner.process_status(status) 125 126 async def _process_events(self, events): 127 if not self._eventer_runner: 128 return 129 130 await self._eventer_runner.process_events(events) 131 132 def _on_component_status(self, eventer_component, eventer_client, status): 133 self._process_status(status) 134 135 def _on_client_status(self, eventer_client, status): 136 self._process_status(status) 137 138 async def _on_component_events(self, eventer_component, eventer_client, 139 events): 140 await self._process_events(events) 141 142 async def _on_client_events(self, eventer_client, events): 143 await self._process_events(events)
Resource with lifetime control based on Group
.
MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
class
EventerRunner(hat.aio.group.Resource):
146class EventerRunner(aio.Resource): 147 148 def __init__(self, 149 conf: json.Data, 150 eventer_client: hat.event.eventer.Client): 151 self._conf = conf 152 self._eventer_client = eventer_client 153 self._engine = None 154 self._status_event = asyncio.Event() 155 self._async_group = aio.Group() 156 157 self._status_event.set() 158 self.async_group.spawn(self._run) 159 160 @property 161 def async_group(self) -> aio.Group: 162 return self._async_group 163 164 def process_status(self, status: hat.event.common.Status): 165 self._status_event.set() 166 167 async def process_events(self, events: Iterable[hat.event.common.Event]): 168 if not self.is_open or not self._engine or not self._engine.is_open: 169 return 170 171 await self._engine.process_events(events) 172 173 async def _run(self): 174 try: 175 mlog.debug("starting eventer runner loop") 176 while True: 177 await self._status_event.wait() 178 179 self._status_event.clear() 180 if not self._is_active(): 181 continue 182 183 mlog.debug("creating engine") 184 self._engine = hat.gateway.engine.Engine( 185 conf=self._conf, 186 eventer_client=self._eventer_client) 187 188 async with self.async_group.create_subgroup() as subgroup: 189 engine_closing_task = subgroup.spawn( 190 self._engine.wait_closing) 191 192 while True: 193 if engine_closing_task.done(): 194 return 195 196 self._status_event.clear() 197 if not self._is_active(): 198 break 199 200 status_task = subgroup.spawn(self._status_event.wait) 201 202 await asyncio.wait( 203 [engine_closing_task, status_task], 204 return_when=asyncio.FIRST_COMPLETED) 205 206 await self._engine.async_close() 207 208 except Exception as e: 209 mlog.error("eventer runner loop error: %s", e, exc_info=e) 210 211 finally: 212 mlog.debug("closing eventer runner loop") 213 self.close() 214 215 if self._engine: 216 await aio.uncancellable(self._engine.async_close()) 217 218 def _is_active(self): 219 if not self._eventer_client.is_open: 220 return False 221 222 if self._conf['event_server'].get('require_operational'): 223 return self._eventer_client.status == hat.event.common.Status.OPERATIONAL # NOQA 224 225 return True
Resource with lifetime control based on Group
.
EventerRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client)
148 def __init__(self, 149 conf: json.Data, 150 eventer_client: hat.event.eventer.Client): 151 self._conf = conf 152 self._eventer_client = eventer_client 153 self._engine = None 154 self._status_event = asyncio.Event() 155 self._async_group = aio.Group() 156 157 self._status_event.set() 158 self.async_group.spawn(self._run)
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close