hat.gateway.runner

  1import asyncio
  2import logging
  3
  4from hat import aio
  5from hat import json
  6from hat.drivers import tcp
  7
  8import hat.event.eventer
  9import hat.event.component
 10
 11import hat.gateway.engine
 12
 13
 14mlog: logging.Logger = logging.getLogger(__name__)
 15"""Module logger"""
 16
 17
 18class MainRunner(aio.Resource):
 19
 20    def __init__(self, conf: json.Data):
 21        self._conf = conf
 22        self._loop = asyncio.get_running_loop()
 23        self._async_group = aio.Group()
 24        self._eventer_component = None
 25        self._eventer_client = None
 26        self._engine = None
 27
 28        self.async_group.spawn(self._run)
 29
 30    @property
 31    def async_group(self) -> aio.Group:
 32        return self._async_group
 33
 34    async def _run(self):
 35        try:
 36            await self._start()
 37            await self._loop.create_future()
 38
 39        except Exception as e:
 40            mlog.error("main runner loop error: %s", e, exc_info=e)
 41
 42        finally:
 43            self.close()
 44            await aio.uncancellable(self._stop())
 45
 46    async def _start(self):
 47        event_server_conf = self._conf['event_server']
 48        subscriptions = [('gateway', self._conf['gateway_name'], '?', '?',
 49                          'system', '*')]
 50
 51        if 'monitor_component' in event_server_conf:
 52            monitor_component_conf = event_server_conf['monitor_component']
 53
 54            self._eventer_component = await hat.event.component.connect(
 55                addr=tcp.Address(monitor_component_conf['host'],
 56                                 monitor_component_conf['port']),
 57                name=self._conf['gateway_name'],
 58                group=monitor_component_conf['gateway_group'],
 59                server_group=monitor_component_conf['event_server_group'],
 60                runner_cb=self._create_eventer_runner,
 61                events_cb=self._on_component_events,
 62                eventer_kwargs={'subscriptions': subscriptions})
 63            _bind_resource(self.async_group, self._eventer_component)
 64
 65            await self._eventer_component.set_ready(True)
 66
 67        elif 'eventer_server' in event_server_conf:
 68            eventer_server_conf = event_server_conf['eventer_server']
 69
 70            self._eventer_client = await hat.event.eventer.connect(
 71                addr=tcp.Address(eventer_server_conf['host'],
 72                                 eventer_server_conf['port']),
 73                client_name=self._conf['gateway_name'],
 74                subscriptions=subscriptions,
 75                events_cb=self._on_client_events)
 76            _bind_resource(self.async_group, self._eventer_client)
 77
 78            self._engine = hat.gateway.engine.Engine(
 79                conf=self._conf,
 80                eventer_client=self._eventer_client)
 81            _bind_resource(self.async_group, self._engine)
 82
 83        else:
 84            raise Exception('invalid configuration')
 85
 86    async def _stop(self):
 87        if self._engine and not self._eventer_component:
 88            await self._engine.async_close()
 89
 90        if self._eventer_client:
 91            await self._eventer_client.async_close()
 92
 93        if self._eventer_component:
 94            await self._eventer_component.async_close()
 95
 96    async def _create_eventer_runner(self, monitor_component, server_data,
 97                                     eventer_client):
 98        self._engine = hat.gateway.engine.Engine(
 99            conf=self._conf,
100            eventer_client=eventer_client)
101
102        return self._engine
103
104    async def _process_events(self, events):
105        if not self._engine:
106            return
107
108        await self._engine.process_events(events)
109
110    async def _on_component_events(self, monitor_component, eventer_client,
111                                   events):
112        await self._process_events(events)
113
114    async def _on_client_events(self, eventer_client, events):
115        await self._process_events(events)
116
117
118def _bind_resource(async_group, resource):
119    async_group.spawn(aio.call_on_done, resource.wait_closing(),
120                      async_group.close)
mlog: logging.Logger = <Logger hat.gateway.runner (WARNING)>

Module logger

class MainRunner(hat.aio.group.Resource):
 19class MainRunner(aio.Resource):
 20
 21    def __init__(self, conf: json.Data):
 22        self._conf = conf
 23        self._loop = asyncio.get_running_loop()
 24        self._async_group = aio.Group()
 25        self._eventer_component = None
 26        self._eventer_client = None
 27        self._engine = None
 28
 29        self.async_group.spawn(self._run)
 30
 31    @property
 32    def async_group(self) -> aio.Group:
 33        return self._async_group
 34
 35    async def _run(self):
 36        try:
 37            await self._start()
 38            await self._loop.create_future()
 39
 40        except Exception as e:
 41            mlog.error("main runner loop error: %s", e, exc_info=e)
 42
 43        finally:
 44            self.close()
 45            await aio.uncancellable(self._stop())
 46
 47    async def _start(self):
 48        event_server_conf = self._conf['event_server']
 49        subscriptions = [('gateway', self._conf['gateway_name'], '?', '?',
 50                          'system', '*')]
 51
 52        if 'monitor_component' in event_server_conf:
 53            monitor_component_conf = event_server_conf['monitor_component']
 54
 55            self._eventer_component = await hat.event.component.connect(
 56                addr=tcp.Address(monitor_component_conf['host'],
 57                                 monitor_component_conf['port']),
 58                name=self._conf['gateway_name'],
 59                group=monitor_component_conf['gateway_group'],
 60                server_group=monitor_component_conf['event_server_group'],
 61                runner_cb=self._create_eventer_runner,
 62                events_cb=self._on_component_events,
 63                eventer_kwargs={'subscriptions': subscriptions})
 64            _bind_resource(self.async_group, self._eventer_component)
 65
 66            await self._eventer_component.set_ready(True)
 67
 68        elif 'eventer_server' in event_server_conf:
 69            eventer_server_conf = event_server_conf['eventer_server']
 70
 71            self._eventer_client = await hat.event.eventer.connect(
 72                addr=tcp.Address(eventer_server_conf['host'],
 73                                 eventer_server_conf['port']),
 74                client_name=self._conf['gateway_name'],
 75                subscriptions=subscriptions,
 76                events_cb=self._on_client_events)
 77            _bind_resource(self.async_group, self._eventer_client)
 78
 79            self._engine = hat.gateway.engine.Engine(
 80                conf=self._conf,
 81                eventer_client=self._eventer_client)
 82            _bind_resource(self.async_group, self._engine)
 83
 84        else:
 85            raise Exception('invalid configuration')
 86
 87    async def _stop(self):
 88        if self._engine and not self._eventer_component:
 89            await self._engine.async_close()
 90
 91        if self._eventer_client:
 92            await self._eventer_client.async_close()
 93
 94        if self._eventer_component:
 95            await self._eventer_component.async_close()
 96
 97    async def _create_eventer_runner(self, monitor_component, server_data,
 98                                     eventer_client):
 99        self._engine = hat.gateway.engine.Engine(
100            conf=self._conf,
101            eventer_client=eventer_client)
102
103        return self._engine
104
105    async def _process_events(self, events):
106        if not self._engine:
107            return
108
109        await self._engine.process_events(events)
110
111    async def _on_component_events(self, monitor_component, eventer_client,
112                                   events):
113        await self._process_events(events)
114
115    async def _on_client_events(self, eventer_client, events):
116        await self._process_events(events)

Resource with lifetime control based on Group.

MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
21    def __init__(self, conf: json.Data):
22        self._conf = conf
23        self._loop = asyncio.get_running_loop()
24        self._async_group = aio.Group()
25        self._eventer_component = None
26        self._eventer_client = None
27        self._engine = None
28
29        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
31    @property
32    def async_group(self) -> aio.Group:
33        return self._async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close