hat.gateway.runner
1import asyncio 2import logging 3 4from hat import aio 5from hat import json 6from hat.drivers import tcp 7 8import hat.event.eventer 9import hat.event.component 10 11import hat.gateway.engine 12 13 14mlog: logging.Logger = logging.getLogger(__name__) 15"""Module logger""" 16 17 18class MainRunner(aio.Resource): 19 20 def __init__(self, conf: json.Data): 21 self._conf = conf 22 self._loop = asyncio.get_running_loop() 23 self._async_group = aio.Group() 24 self._eventer_component = None 25 self._eventer_client = None 26 self._engine = None 27 28 self.async_group.spawn(self._run) 29 30 @property 31 def async_group(self) -> aio.Group: 32 return self._async_group 33 34 async def _run(self): 35 try: 36 await self._start() 37 await self._loop.create_future() 38 39 except Exception as e: 40 mlog.error("main runner loop error: %s", e, exc_info=e) 41 42 finally: 43 self.close() 44 await aio.uncancellable(self._stop()) 45 46 async def _start(self): 47 event_server_conf = self._conf['event_server'] 48 subscriptions = [('gateway', self._conf['gateway_name'], '?', '?', 49 'system', '*')] 50 51 if 'monitor_component' in event_server_conf: 52 monitor_component_conf = event_server_conf['monitor_component'] 53 54 self._eventer_component = await hat.event.component.connect( 55 addr=tcp.Address(monitor_component_conf['host'], 56 monitor_component_conf['port']), 57 name=self._conf['gateway_name'], 58 group=monitor_component_conf['gateway_group'], 59 server_group=monitor_component_conf['event_server_group'], 60 runner_cb=self._create_eventer_runner, 61 events_cb=self._on_component_events, 62 eventer_kwargs={'subscriptions': subscriptions}) 63 _bind_resource(self.async_group, self._eventer_component) 64 65 await self._eventer_component.set_ready(True) 66 67 elif 'eventer_server' in event_server_conf: 68 eventer_server_conf = event_server_conf['eventer_server'] 69 70 self._eventer_client = await hat.event.eventer.connect( 71 addr=tcp.Address(eventer_server_conf['host'], 72 eventer_server_conf['port']), 73 client_name=self._conf['gateway_name'], 74 subscriptions=subscriptions, 75 events_cb=self._on_client_events) 76 _bind_resource(self.async_group, self._eventer_client) 77 78 self._engine = hat.gateway.engine.Engine( 79 conf=self._conf, 80 eventer_client=self._eventer_client) 81 _bind_resource(self.async_group, self._engine) 82 83 else: 84 raise Exception('invalid configuration') 85 86 async def _stop(self): 87 if self._engine and not self._eventer_component: 88 await self._engine.async_close() 89 90 if self._eventer_client: 91 await self._eventer_client.async_close() 92 93 if self._eventer_component: 94 await self._eventer_component.async_close() 95 96 async def _create_eventer_runner(self, monitor_component, server_data, 97 eventer_client): 98 self._engine = hat.gateway.engine.Engine( 99 conf=self._conf, 100 eventer_client=eventer_client) 101 102 return self._engine 103 104 async def _process_events(self, events): 105 if not self._engine: 106 return 107 108 await self._engine.process_events(events) 109 110 async def _on_component_events(self, monitor_component, eventer_client, 111 events): 112 await self._process_events(events) 113 114 async def _on_client_events(self, eventer_client, events): 115 await self._process_events(events) 116 117 118def _bind_resource(async_group, resource): 119 async_group.spawn(aio.call_on_done, resource.wait_closing(), 120 async_group.close)
Module logger
class
MainRunner(hat.aio.group.Resource):
19class MainRunner(aio.Resource): 20 21 def __init__(self, conf: json.Data): 22 self._conf = conf 23 self._loop = asyncio.get_running_loop() 24 self._async_group = aio.Group() 25 self._eventer_component = None 26 self._eventer_client = None 27 self._engine = None 28 29 self.async_group.spawn(self._run) 30 31 @property 32 def async_group(self) -> aio.Group: 33 return self._async_group 34 35 async def _run(self): 36 try: 37 await self._start() 38 await self._loop.create_future() 39 40 except Exception as e: 41 mlog.error("main runner loop error: %s", e, exc_info=e) 42 43 finally: 44 self.close() 45 await aio.uncancellable(self._stop()) 46 47 async def _start(self): 48 event_server_conf = self._conf['event_server'] 49 subscriptions = [('gateway', self._conf['gateway_name'], '?', '?', 50 'system', '*')] 51 52 if 'monitor_component' in event_server_conf: 53 monitor_component_conf = event_server_conf['monitor_component'] 54 55 self._eventer_component = await hat.event.component.connect( 56 addr=tcp.Address(monitor_component_conf['host'], 57 monitor_component_conf['port']), 58 name=self._conf['gateway_name'], 59 group=monitor_component_conf['gateway_group'], 60 server_group=monitor_component_conf['event_server_group'], 61 runner_cb=self._create_eventer_runner, 62 events_cb=self._on_component_events, 63 eventer_kwargs={'subscriptions': subscriptions}) 64 _bind_resource(self.async_group, self._eventer_component) 65 66 await self._eventer_component.set_ready(True) 67 68 elif 'eventer_server' in event_server_conf: 69 eventer_server_conf = event_server_conf['eventer_server'] 70 71 self._eventer_client = await hat.event.eventer.connect( 72 addr=tcp.Address(eventer_server_conf['host'], 73 eventer_server_conf['port']), 74 client_name=self._conf['gateway_name'], 75 subscriptions=subscriptions, 76 events_cb=self._on_client_events) 77 _bind_resource(self.async_group, self._eventer_client) 78 79 self._engine = hat.gateway.engine.Engine( 80 conf=self._conf, 81 eventer_client=self._eventer_client) 82 _bind_resource(self.async_group, self._engine) 83 84 else: 85 raise Exception('invalid configuration') 86 87 async def _stop(self): 88 if self._engine and not self._eventer_component: 89 await self._engine.async_close() 90 91 if self._eventer_client: 92 await self._eventer_client.async_close() 93 94 if self._eventer_component: 95 await self._eventer_component.async_close() 96 97 async def _create_eventer_runner(self, monitor_component, server_data, 98 eventer_client): 99 self._engine = hat.gateway.engine.Engine( 100 conf=self._conf, 101 eventer_client=eventer_client) 102 103 return self._engine 104 105 async def _process_events(self, events): 106 if not self._engine: 107 return 108 109 await self._engine.process_events(events) 110 111 async def _on_component_events(self, monitor_component, eventer_client, 112 events): 113 await self._process_events(events) 114 115 async def _on_client_events(self, eventer_client, events): 116 await self._process_events(events)
Resource with lifetime control based on Group
.
MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close