hat.gateway.runner

  1from collections.abc import Iterable
  2import asyncio
  3import collections
  4import logging
  5
  6from hat import aio
  7from hat import json
  8from hat.drivers import tcp
  9import hat.event.component
 10import hat.event.eventer
 11
 12from hat.gateway import common
 13import hat.gateway.engine
 14
 15
 16mlog: logging.Logger = logging.getLogger(__name__)
 17"""Module logger"""
 18
 19
 20class MainRunner(aio.Resource):
 21
 22    def __init__(self, conf: json.Data):
 23        self._conf = conf
 24        self._loop = asyncio.get_running_loop()
 25        self._async_group = aio.Group()
 26        self._eventer_component = None
 27        self._eventer_client = None
 28        self._eventer_runner = None
 29
 30        self.async_group.spawn(self._run)
 31
 32    @property
 33    def async_group(self) -> aio.Group:
 34        return self._async_group
 35
 36    async def _run(self):
 37        try:
 38            mlog.debug("starting main runner loop")
 39            await self._start()
 40
 41            await self._loop.create_future()
 42
 43        except Exception as e:
 44            mlog.error("main runner loop error: %s", e, exc_info=e)
 45
 46        finally:
 47            mlog.debug("closing main runner loop")
 48            self.close()
 49            await aio.uncancellable(self._stop())
 50
 51    async def _start(self):
 52        event_server_conf = self._conf['event_server']
 53
 54        subscriptions = collections.deque()
 55        for device_conf in self._conf['devices']:
 56            info = common.import_device_info(device_conf['module'])
 57            subscriptions.append(('gateway', info.type, device_conf['name'],
 58                                  'system', '*'))
 59
 60        if 'monitor_component' in event_server_conf:
 61            monitor_component_conf = event_server_conf['monitor_component']
 62
 63            mlog.debug("creating eventer component")
 64            self._eventer_component = await hat.event.component.connect(
 65                addr=tcp.Address(monitor_component_conf['host'],
 66                                 monitor_component_conf['port']),
 67                name=self._conf['name'],
 68                group=monitor_component_conf['gateway_group'],
 69                server_group=monitor_component_conf['event_server_group'],
 70                client_name=f"gateway/{self._conf['name']}",
 71                runner_cb=self._create_eventer_runner,
 72                status_cb=self._on_component_status,
 73                events_cb=self._on_component_events,
 74                eventer_kwargs={'subscriptions': subscriptions})
 75            _bind_resource(self.async_group, self._eventer_component)
 76
 77            await self._eventer_component.set_ready(True)
 78
 79        elif 'eventer_server' in event_server_conf:
 80            eventer_server_conf = event_server_conf['eventer_server']
 81
 82            mlog.debug("creating eventer client")
 83            self._eventer_client = await hat.event.eventer.connect(
 84                addr=tcp.Address(eventer_server_conf['host'],
 85                                 eventer_server_conf['port']),
 86                client_name=f"gateway/{self._conf['name']}",
 87                subscriptions=subscriptions,
 88                status_cb=self._on_client_status,
 89                events_cb=self._on_client_events)
 90            _bind_resource(self.async_group, self._eventer_client)
 91
 92            mlog.debug("creating eventer runner")
 93            self._eventer_runner = EventerRunner(
 94                conf=self._conf,
 95                eventer_client=self._eventer_client)
 96            _bind_resource(self.async_group, self._eventer_runner)
 97
 98        else:
 99            raise Exception('invalid configuration')
100
101    async def _stop(self):
102        if self._eventer_runner and not self._eventer_component:
103            await self._eventer_runner.async_close()
104
105        if self._eventer_client:
106            await self._eventer_client.async_close()
107
108        if self._eventer_component:
109            await self._eventer_component.async_close()
110
111    async def _create_eventer_runner(self, monitor_component, server_data,
112                                     eventer_client):
113        mlog.debug("creating eventer runner")
114        self._eventer_runner = EventerRunner(conf=self._conf,
115                                             eventer_client=eventer_client)
116
117        return self._eventer_runner
118
119    def _process_status(self, status):
120        if not self._eventer_runner:
121            return
122
123        self._eventer_runner.process_status(status)
124
125    async def _process_events(self, events):
126        if not self._eventer_runner:
127            return
128
129        await self._eventer_runner.process_events(events)
130
131    def _on_component_status(self, eventer_component, eventer_client, status):
132        self._process_status(status)
133
134    def _on_client_status(self, eventer_client, status):
135        self._process_status(status)
136
137    async def _on_component_events(self, eventer_component, eventer_client,
138                                   events):
139        await self._process_events(events)
140
141    async def _on_client_events(self, eventer_client, events):
142        await self._process_events(events)
143
144
145class EventerRunner(aio.Resource):
146
147    def __init__(self,
148                 conf: json.Data,
149                 eventer_client: hat.event.eventer.Client):
150        self._conf = conf
151        self._eventer_client = eventer_client
152        self._engine = None
153        self._status_event = asyncio.Event()
154        self._async_group = aio.Group()
155
156        self._status_event.set()
157        self.async_group.spawn(self._run)
158
159    @property
160    def async_group(self) -> aio.Group:
161        return self._async_group
162
163    def process_status(self, status: hat.event.common.Status):
164        self._status_event.set()
165
166    async def process_events(self, events: Iterable[hat.event.common.Event]):
167        if not self.is_open or not self._engine or not self._engine.is_open:
168            return
169
170        await self._engine.process_events(events)
171
172    async def _run(self):
173        try:
174            mlog.debug("starting eventer runner loop")
175            while True:
176                await self._status_event.wait()
177
178                self._status_event.clear()
179                if not self._is_active():
180                    continue
181
182                mlog.debug("creating engine")
183                self._engine = hat.gateway.engine.Engine(
184                    conf=self._conf,
185                    eventer_client=self._eventer_client)
186
187                async with self.async_group.create_subgroup() as subgroup:
188                    engine_closing_task = subgroup.spawn(
189                        self._engine.wait_closing)
190
191                    while True:
192                        if engine_closing_task.done():
193                            return
194
195                        self._status_event.clear()
196                        if not self._is_active():
197                            break
198
199                        status_task = subgroup.spawn(self._status_event.wait)
200
201                        await asyncio.wait(
202                            [engine_closing_task, status_task],
203                            return_when=asyncio.FIRST_COMPLETED)
204
205                await self._engine.async_close()
206
207        except Exception as e:
208            mlog.error("eventer runner loop error: %s", e, exc_info=e)
209
210        finally:
211            mlog.debug("closing eventer runner loop")
212            self.close()
213
214            if self._engine:
215                await aio.uncancellable(self._engine.async_close())
216
217    def _is_active(self):
218        if not self._eventer_client.is_open:
219            return False
220
221        if self._conf['event_server'].get('require_operational'):
222            return self._eventer_client.status == hat.event.common.Status.OPERATIONAL  # NOQA
223
224        return True
225
226
227def _bind_resource(async_group, resource):
228    async_group.spawn(aio.call_on_done, resource.wait_closing(),
229                      async_group.close)
mlog: logging.Logger = <Logger hat.gateway.runner (WARNING)>

Module logger

class MainRunner(hat.aio.group.Resource):
 21class MainRunner(aio.Resource):
 22
 23    def __init__(self, conf: json.Data):
 24        self._conf = conf
 25        self._loop = asyncio.get_running_loop()
 26        self._async_group = aio.Group()
 27        self._eventer_component = None
 28        self._eventer_client = None
 29        self._eventer_runner = None
 30
 31        self.async_group.spawn(self._run)
 32
 33    @property
 34    def async_group(self) -> aio.Group:
 35        return self._async_group
 36
 37    async def _run(self):
 38        try:
 39            mlog.debug("starting main runner loop")
 40            await self._start()
 41
 42            await self._loop.create_future()
 43
 44        except Exception as e:
 45            mlog.error("main runner loop error: %s", e, exc_info=e)
 46
 47        finally:
 48            mlog.debug("closing main runner loop")
 49            self.close()
 50            await aio.uncancellable(self._stop())
 51
 52    async def _start(self):
 53        event_server_conf = self._conf['event_server']
 54
 55        subscriptions = collections.deque()
 56        for device_conf in self._conf['devices']:
 57            info = common.import_device_info(device_conf['module'])
 58            subscriptions.append(('gateway', info.type, device_conf['name'],
 59                                  'system', '*'))
 60
 61        if 'monitor_component' in event_server_conf:
 62            monitor_component_conf = event_server_conf['monitor_component']
 63
 64            mlog.debug("creating eventer component")
 65            self._eventer_component = await hat.event.component.connect(
 66                addr=tcp.Address(monitor_component_conf['host'],
 67                                 monitor_component_conf['port']),
 68                name=self._conf['name'],
 69                group=monitor_component_conf['gateway_group'],
 70                server_group=monitor_component_conf['event_server_group'],
 71                client_name=f"gateway/{self._conf['name']}",
 72                runner_cb=self._create_eventer_runner,
 73                status_cb=self._on_component_status,
 74                events_cb=self._on_component_events,
 75                eventer_kwargs={'subscriptions': subscriptions})
 76            _bind_resource(self.async_group, self._eventer_component)
 77
 78            await self._eventer_component.set_ready(True)
 79
 80        elif 'eventer_server' in event_server_conf:
 81            eventer_server_conf = event_server_conf['eventer_server']
 82
 83            mlog.debug("creating eventer client")
 84            self._eventer_client = await hat.event.eventer.connect(
 85                addr=tcp.Address(eventer_server_conf['host'],
 86                                 eventer_server_conf['port']),
 87                client_name=f"gateway/{self._conf['name']}",
 88                subscriptions=subscriptions,
 89                status_cb=self._on_client_status,
 90                events_cb=self._on_client_events)
 91            _bind_resource(self.async_group, self._eventer_client)
 92
 93            mlog.debug("creating eventer runner")
 94            self._eventer_runner = EventerRunner(
 95                conf=self._conf,
 96                eventer_client=self._eventer_client)
 97            _bind_resource(self.async_group, self._eventer_runner)
 98
 99        else:
100            raise Exception('invalid configuration')
101
102    async def _stop(self):
103        if self._eventer_runner and not self._eventer_component:
104            await self._eventer_runner.async_close()
105
106        if self._eventer_client:
107            await self._eventer_client.async_close()
108
109        if self._eventer_component:
110            await self._eventer_component.async_close()
111
112    async def _create_eventer_runner(self, monitor_component, server_data,
113                                     eventer_client):
114        mlog.debug("creating eventer runner")
115        self._eventer_runner = EventerRunner(conf=self._conf,
116                                             eventer_client=eventer_client)
117
118        return self._eventer_runner
119
120    def _process_status(self, status):
121        if not self._eventer_runner:
122            return
123
124        self._eventer_runner.process_status(status)
125
126    async def _process_events(self, events):
127        if not self._eventer_runner:
128            return
129
130        await self._eventer_runner.process_events(events)
131
132    def _on_component_status(self, eventer_component, eventer_client, status):
133        self._process_status(status)
134
135    def _on_client_status(self, eventer_client, status):
136        self._process_status(status)
137
138    async def _on_component_events(self, eventer_component, eventer_client,
139                                   events):
140        await self._process_events(events)
141
142    async def _on_client_events(self, eventer_client, events):
143        await self._process_events(events)

Resource with lifetime control based on Group.

MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
23    def __init__(self, conf: json.Data):
24        self._conf = conf
25        self._loop = asyncio.get_running_loop()
26        self._async_group = aio.Group()
27        self._eventer_component = None
28        self._eventer_client = None
29        self._eventer_runner = None
30
31        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
33    @property
34    def async_group(self) -> aio.Group:
35        return self._async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class EventerRunner(hat.aio.group.Resource):
146class EventerRunner(aio.Resource):
147
148    def __init__(self,
149                 conf: json.Data,
150                 eventer_client: hat.event.eventer.Client):
151        self._conf = conf
152        self._eventer_client = eventer_client
153        self._engine = None
154        self._status_event = asyncio.Event()
155        self._async_group = aio.Group()
156
157        self._status_event.set()
158        self.async_group.spawn(self._run)
159
160    @property
161    def async_group(self) -> aio.Group:
162        return self._async_group
163
164    def process_status(self, status: hat.event.common.Status):
165        self._status_event.set()
166
167    async def process_events(self, events: Iterable[hat.event.common.Event]):
168        if not self.is_open or not self._engine or not self._engine.is_open:
169            return
170
171        await self._engine.process_events(events)
172
173    async def _run(self):
174        try:
175            mlog.debug("starting eventer runner loop")
176            while True:
177                await self._status_event.wait()
178
179                self._status_event.clear()
180                if not self._is_active():
181                    continue
182
183                mlog.debug("creating engine")
184                self._engine = hat.gateway.engine.Engine(
185                    conf=self._conf,
186                    eventer_client=self._eventer_client)
187
188                async with self.async_group.create_subgroup() as subgroup:
189                    engine_closing_task = subgroup.spawn(
190                        self._engine.wait_closing)
191
192                    while True:
193                        if engine_closing_task.done():
194                            return
195
196                        self._status_event.clear()
197                        if not self._is_active():
198                            break
199
200                        status_task = subgroup.spawn(self._status_event.wait)
201
202                        await asyncio.wait(
203                            [engine_closing_task, status_task],
204                            return_when=asyncio.FIRST_COMPLETED)
205
206                await self._engine.async_close()
207
208        except Exception as e:
209            mlog.error("eventer runner loop error: %s", e, exc_info=e)
210
211        finally:
212            mlog.debug("closing eventer runner loop")
213            self.close()
214
215            if self._engine:
216                await aio.uncancellable(self._engine.async_close())
217
218    def _is_active(self):
219        if not self._eventer_client.is_open:
220            return False
221
222        if self._conf['event_server'].get('require_operational'):
223            return self._eventer_client.status == hat.event.common.Status.OPERATIONAL  # NOQA
224
225        return True

Resource with lifetime control based on Group.

EventerRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client)
148    def __init__(self,
149                 conf: json.Data,
150                 eventer_client: hat.event.eventer.Client):
151        self._conf = conf
152        self._eventer_client = eventer_client
153        self._engine = None
154        self._status_event = asyncio.Event()
155        self._async_group = aio.Group()
156
157        self._status_event.set()
158        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
160    @property
161    def async_group(self) -> aio.Group:
162        return self._async_group

Group controlling resource's lifetime.

def process_status(self, status: hat.event.common.common.Status):
164    def process_status(self, status: hat.event.common.Status):
165        self._status_event.set()
async def process_events(self, events: Iterable[hat.event.common.common.Event]):
167    async def process_events(self, events: Iterable[hat.event.common.Event]):
168        if not self.is_open or not self._engine or not self._engine.is_open:
169            return
170
171        await self._engine.process_events(events)
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close