hat.gateway.runner

  1from collections.abc import Iterable
  2import asyncio
  3import collections
  4import logging
  5
  6from hat import aio
  7from hat import json
  8from hat.drivers import tcp
  9import hat.event.component
 10import hat.event.eventer
 11
 12from hat.gateway import common
 13from hat.gateway.adminer_server import create_adminer_server
 14import hat.gateway.engine
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20
 21class MainRunner(aio.Resource):
 22
 23    def __init__(self, conf: json.Data):
 24        self._conf = conf
 25        self._loop = asyncio.get_running_loop()
 26        self._async_group = aio.Group()
 27        self._eventer_component = None
 28        self._eventer_client = None
 29        self._eventer_runner = None
 30        self._adminer_server = None
 31
 32        self.async_group.spawn(self._run)
 33
 34    @property
 35    def async_group(self) -> aio.Group:
 36        return self._async_group
 37
 38    async def _run(self):
 39        try:
 40            mlog.debug("starting main runner loop")
 41            await self._start()
 42
 43            await self._loop.create_future()
 44
 45        except Exception as e:
 46            mlog.error("main runner loop error: %s", e, exc_info=e)
 47
 48        finally:
 49            mlog.debug("closing main runner loop")
 50            self.close()
 51            await aio.uncancellable(self._stop())
 52
 53    async def _start(self):
 54        event_server_conf = self._conf['event_server']
 55
 56        subscriptions = collections.deque()
 57        for device_conf in self._conf['devices']:
 58            info = common.import_device_info(device_conf['module'])
 59            subscriptions.append(('gateway', info.type, device_conf['name'],
 60                                  'system', '*'))
 61
 62        if 'monitor_component' in event_server_conf:
 63            monitor_component_conf = event_server_conf['monitor_component']
 64
 65            mlog.debug("creating eventer component")
 66            self._eventer_component = await hat.event.component.connect(
 67                addr=tcp.Address(monitor_component_conf['host'],
 68                                 monitor_component_conf['port']),
 69                name=self._conf['name'],
 70                group=monitor_component_conf['gateway_group'],
 71                server_group=monitor_component_conf['event_server_group'],
 72                client_name=f"gateway/{self._conf['name']}",
 73                runner_cb=self._create_eventer_runner,
 74                status_cb=self._on_component_status,
 75                events_cb=self._on_component_events,
 76                eventer_kwargs={'subscriptions': subscriptions})
 77            _bind_resource(self.async_group, self._eventer_component)
 78
 79            await self._eventer_component.set_ready(True)
 80
 81        elif 'eventer_server' in event_server_conf:
 82            eventer_server_conf = event_server_conf['eventer_server']
 83
 84            mlog.debug("creating eventer client")
 85            self._eventer_client = await hat.event.eventer.connect(
 86                addr=tcp.Address(eventer_server_conf['host'],
 87                                 eventer_server_conf['port']),
 88                client_name=f"gateway/{self._conf['name']}",
 89                subscriptions=subscriptions,
 90                status_cb=self._on_client_status,
 91                events_cb=self._on_client_events)
 92            _bind_resource(self.async_group, self._eventer_client)
 93
 94            mlog.debug("creating eventer runner")
 95            self._eventer_runner = EventerRunner(
 96                conf=self._conf,
 97                eventer_client=self._eventer_client)
 98            _bind_resource(self.async_group, self._eventer_runner)
 99
100        else:
101            raise Exception('invalid configuration')
102
103        if 'adminer_server' in self._conf:
104            mlog.debug("creating adminer server")
105            self._adminer_server = await create_adminer_server(
106                addr=tcp.Address(self._conf['adminer_server']['host'],
107                                 self._conf['adminer_server']['port']),
108                log_conf=self._conf.get('log'))
109            _bind_resource(self.async_group, self._adminer_server)
110
111    async def _stop(self):
112        if self._adminer_server:
113            await self._adminer_server.async_close()
114
115        if self._eventer_runner and not self._eventer_component:
116            await self._eventer_runner.async_close()
117
118        if self._eventer_client:
119            await self._eventer_client.async_close()
120
121        if self._eventer_component:
122            await self._eventer_component.async_close()
123
124    async def _create_eventer_runner(self, monitor_component, server_data,
125                                     eventer_client):
126        mlog.debug("creating eventer runner")
127        self._eventer_runner = EventerRunner(conf=self._conf,
128                                             eventer_client=eventer_client)
129
130        return self._eventer_runner
131
132    def _process_status(self, status):
133        if not self._eventer_runner:
134            return
135
136        self._eventer_runner.process_status(status)
137
138    async def _process_events(self, events):
139        if not self._eventer_runner:
140            return
141
142        await self._eventer_runner.process_events(events)
143
144    def _on_component_status(self, eventer_component, eventer_client, status):
145        self._process_status(status)
146
147    def _on_client_status(self, eventer_client, status):
148        self._process_status(status)
149
150    async def _on_component_events(self, eventer_component, eventer_client,
151                                   events):
152        await self._process_events(events)
153
154    async def _on_client_events(self, eventer_client, events):
155        await self._process_events(events)
156
157
158class EventerRunner(aio.Resource):
159
160    def __init__(self,
161                 conf: json.Data,
162                 eventer_client: hat.event.eventer.Client):
163        self._conf = conf
164        self._eventer_client = eventer_client
165        self._engine = None
166        self._status_event = asyncio.Event()
167        self._async_group = aio.Group()
168
169        self._status_event.set()
170        self.async_group.spawn(self._run)
171
172    @property
173    def async_group(self) -> aio.Group:
174        return self._async_group
175
176    def process_status(self, status: hat.event.common.Status):
177        self._status_event.set()
178
179    async def process_events(self, events: Iterable[hat.event.common.Event]):
180        if not self.is_open or not self._engine or not self._engine.is_open:
181            return
182
183        await self._engine.process_events(events)
184
185    async def _run(self):
186        try:
187            mlog.debug("starting eventer runner loop")
188            while True:
189                await self._status_event.wait()
190
191                self._status_event.clear()
192                if not self._is_active():
193                    continue
194
195                mlog.debug("creating engine")
196                self._engine = hat.gateway.engine.Engine(
197                    conf=self._conf,
198                    eventer_client=self._eventer_client)
199
200                async with self.async_group.create_subgroup() as subgroup:
201                    engine_closing_task = subgroup.spawn(
202                        self._engine.wait_closing)
203
204                    while True:
205                        if engine_closing_task.done():
206                            return
207
208                        self._status_event.clear()
209                        if not self._is_active():
210                            break
211
212                        status_task = subgroup.spawn(self._status_event.wait)
213
214                        await asyncio.wait(
215                            [engine_closing_task, status_task],
216                            return_when=asyncio.FIRST_COMPLETED)
217
218                await self._engine.async_close()
219
220        except Exception as e:
221            mlog.error("eventer runner loop error: %s", e, exc_info=e)
222
223        finally:
224            mlog.debug("closing eventer runner loop")
225            self.close()
226
227            if self._engine:
228                await aio.uncancellable(self._engine.async_close())
229
230    def _is_active(self):
231        if not self._eventer_client.is_open:
232            return False
233
234        if self._conf['event_server'].get('require_operational'):
235            return self._eventer_client.status == hat.event.common.Status.OPERATIONAL  # NOQA
236
237        return True
238
239
240def _bind_resource(async_group, resource):
241    async_group.spawn(aio.call_on_done, resource.wait_closing(),
242                      async_group.close)
mlog: logging.Logger = <Logger hat.gateway.runner (WARNING)>

Module logger

class MainRunner(hat.aio.group.Resource):
 22class MainRunner(aio.Resource):
 23
 24    def __init__(self, conf: json.Data):
 25        self._conf = conf
 26        self._loop = asyncio.get_running_loop()
 27        self._async_group = aio.Group()
 28        self._eventer_component = None
 29        self._eventer_client = None
 30        self._eventer_runner = None
 31        self._adminer_server = None
 32
 33        self.async_group.spawn(self._run)
 34
 35    @property
 36    def async_group(self) -> aio.Group:
 37        return self._async_group
 38
 39    async def _run(self):
 40        try:
 41            mlog.debug("starting main runner loop")
 42            await self._start()
 43
 44            await self._loop.create_future()
 45
 46        except Exception as e:
 47            mlog.error("main runner loop error: %s", e, exc_info=e)
 48
 49        finally:
 50            mlog.debug("closing main runner loop")
 51            self.close()
 52            await aio.uncancellable(self._stop())
 53
 54    async def _start(self):
 55        event_server_conf = self._conf['event_server']
 56
 57        subscriptions = collections.deque()
 58        for device_conf in self._conf['devices']:
 59            info = common.import_device_info(device_conf['module'])
 60            subscriptions.append(('gateway', info.type, device_conf['name'],
 61                                  'system', '*'))
 62
 63        if 'monitor_component' in event_server_conf:
 64            monitor_component_conf = event_server_conf['monitor_component']
 65
 66            mlog.debug("creating eventer component")
 67            self._eventer_component = await hat.event.component.connect(
 68                addr=tcp.Address(monitor_component_conf['host'],
 69                                 monitor_component_conf['port']),
 70                name=self._conf['name'],
 71                group=monitor_component_conf['gateway_group'],
 72                server_group=monitor_component_conf['event_server_group'],
 73                client_name=f"gateway/{self._conf['name']}",
 74                runner_cb=self._create_eventer_runner,
 75                status_cb=self._on_component_status,
 76                events_cb=self._on_component_events,
 77                eventer_kwargs={'subscriptions': subscriptions})
 78            _bind_resource(self.async_group, self._eventer_component)
 79
 80            await self._eventer_component.set_ready(True)
 81
 82        elif 'eventer_server' in event_server_conf:
 83            eventer_server_conf = event_server_conf['eventer_server']
 84
 85            mlog.debug("creating eventer client")
 86            self._eventer_client = await hat.event.eventer.connect(
 87                addr=tcp.Address(eventer_server_conf['host'],
 88                                 eventer_server_conf['port']),
 89                client_name=f"gateway/{self._conf['name']}",
 90                subscriptions=subscriptions,
 91                status_cb=self._on_client_status,
 92                events_cb=self._on_client_events)
 93            _bind_resource(self.async_group, self._eventer_client)
 94
 95            mlog.debug("creating eventer runner")
 96            self._eventer_runner = EventerRunner(
 97                conf=self._conf,
 98                eventer_client=self._eventer_client)
 99            _bind_resource(self.async_group, self._eventer_runner)
100
101        else:
102            raise Exception('invalid configuration')
103
104        if 'adminer_server' in self._conf:
105            mlog.debug("creating adminer server")
106            self._adminer_server = await create_adminer_server(
107                addr=tcp.Address(self._conf['adminer_server']['host'],
108                                 self._conf['adminer_server']['port']),
109                log_conf=self._conf.get('log'))
110            _bind_resource(self.async_group, self._adminer_server)
111
112    async def _stop(self):
113        if self._adminer_server:
114            await self._adminer_server.async_close()
115
116        if self._eventer_runner and not self._eventer_component:
117            await self._eventer_runner.async_close()
118
119        if self._eventer_client:
120            await self._eventer_client.async_close()
121
122        if self._eventer_component:
123            await self._eventer_component.async_close()
124
125    async def _create_eventer_runner(self, monitor_component, server_data,
126                                     eventer_client):
127        mlog.debug("creating eventer runner")
128        self._eventer_runner = EventerRunner(conf=self._conf,
129                                             eventer_client=eventer_client)
130
131        return self._eventer_runner
132
133    def _process_status(self, status):
134        if not self._eventer_runner:
135            return
136
137        self._eventer_runner.process_status(status)
138
139    async def _process_events(self, events):
140        if not self._eventer_runner:
141            return
142
143        await self._eventer_runner.process_events(events)
144
145    def _on_component_status(self, eventer_component, eventer_client, status):
146        self._process_status(status)
147
148    def _on_client_status(self, eventer_client, status):
149        self._process_status(status)
150
151    async def _on_component_events(self, eventer_component, eventer_client,
152                                   events):
153        await self._process_events(events)
154
155    async def _on_client_events(self, eventer_client, events):
156        await self._process_events(events)

Resource with lifetime control based on Group.

MainRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])
24    def __init__(self, conf: json.Data):
25        self._conf = conf
26        self._loop = asyncio.get_running_loop()
27        self._async_group = aio.Group()
28        self._eventer_component = None
29        self._eventer_client = None
30        self._eventer_runner = None
31        self._adminer_server = None
32
33        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
35    @property
36    def async_group(self) -> aio.Group:
37        return self._async_group

Group controlling resource's lifetime.

class EventerRunner(hat.aio.group.Resource):
159class EventerRunner(aio.Resource):
160
161    def __init__(self,
162                 conf: json.Data,
163                 eventer_client: hat.event.eventer.Client):
164        self._conf = conf
165        self._eventer_client = eventer_client
166        self._engine = None
167        self._status_event = asyncio.Event()
168        self._async_group = aio.Group()
169
170        self._status_event.set()
171        self.async_group.spawn(self._run)
172
173    @property
174    def async_group(self) -> aio.Group:
175        return self._async_group
176
177    def process_status(self, status: hat.event.common.Status):
178        self._status_event.set()
179
180    async def process_events(self, events: Iterable[hat.event.common.Event]):
181        if not self.is_open or not self._engine or not self._engine.is_open:
182            return
183
184        await self._engine.process_events(events)
185
186    async def _run(self):
187        try:
188            mlog.debug("starting eventer runner loop")
189            while True:
190                await self._status_event.wait()
191
192                self._status_event.clear()
193                if not self._is_active():
194                    continue
195
196                mlog.debug("creating engine")
197                self._engine = hat.gateway.engine.Engine(
198                    conf=self._conf,
199                    eventer_client=self._eventer_client)
200
201                async with self.async_group.create_subgroup() as subgroup:
202                    engine_closing_task = subgroup.spawn(
203                        self._engine.wait_closing)
204
205                    while True:
206                        if engine_closing_task.done():
207                            return
208
209                        self._status_event.clear()
210                        if not self._is_active():
211                            break
212
213                        status_task = subgroup.spawn(self._status_event.wait)
214
215                        await asyncio.wait(
216                            [engine_closing_task, status_task],
217                            return_when=asyncio.FIRST_COMPLETED)
218
219                await self._engine.async_close()
220
221        except Exception as e:
222            mlog.error("eventer runner loop error: %s", e, exc_info=e)
223
224        finally:
225            mlog.debug("closing eventer runner loop")
226            self.close()
227
228            if self._engine:
229                await aio.uncancellable(self._engine.async_close())
230
231    def _is_active(self):
232        if not self._eventer_client.is_open:
233            return False
234
235        if self._conf['event_server'].get('require_operational'):
236            return self._eventer_client.status == hat.event.common.Status.OPERATIONAL  # NOQA
237
238        return True

Resource with lifetime control based on Group.

EventerRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client)
161    def __init__(self,
162                 conf: json.Data,
163                 eventer_client: hat.event.eventer.Client):
164        self._conf = conf
165        self._eventer_client = eventer_client
166        self._engine = None
167        self._status_event = asyncio.Event()
168        self._async_group = aio.Group()
169
170        self._status_event.set()
171        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
173    @property
174    def async_group(self) -> aio.Group:
175        return self._async_group

Group controlling resource's lifetime.

def process_status(self, status: hat.event.common.common.Status):
177    def process_status(self, status: hat.event.common.Status):
178        self._status_event.set()
async def process_events(self, events: Iterable[hat.event.common.common.Event]):
180    async def process_events(self, events: Iterable[hat.event.common.Event]):
181        if not self.is_open or not self._engine or not self._engine.is_open:
182            return
183
184        await self._engine.process_events(events)