hat.gateway.main

Gateway main

  1"""Gateway main"""
  2
  3from pathlib import Path
  4import argparse
  5import asyncio
  6import contextlib
  7import functools
  8import importlib
  9import logging.config
 10import sys
 11import typing
 12
 13import appdirs
 14
 15from hat import aio
 16from hat import json
 17import hat.event.common
 18import hat.event.eventer
 19import hat.monitor.client
 20
 21from hat.gateway import common
 22from hat.gateway.engine import create_engine
 23
 24
 25mlog: logging.Logger = logging.getLogger(__name__)
 26"""Module logger"""
 27
 28user_conf_dir: Path = Path(appdirs.user_config_dir('hat'))
 29"""User configuration directory path"""
 30
 31
 32def create_argument_parser() -> argparse.ArgumentParser:
 33    """Create argument parser"""
 34    parser = argparse.ArgumentParser()
 35    parser.add_argument(
 36        '--conf', metavar='PATH', type=Path, default=None,
 37        help="configuration defined by hat-gateway://main.yaml# "
 38             "(default $XDG_CONFIG_HOME/hat/gateway.{yaml|yml|json})")
 39    return parser
 40
 41
 42def main():
 43    """Gateway"""
 44    parser = create_argument_parser()
 45    args = parser.parse_args()
 46
 47    conf_path = args.conf
 48    if not conf_path:
 49        for suffix in ('.yaml', '.yml', '.json'):
 50            conf_path = (user_conf_dir / 'gateway').with_suffix(suffix)
 51            if conf_path.exists():
 52                break
 53
 54    if conf_path == Path('-'):
 55        conf = json.decode_stream(sys.stdin)
 56    else:
 57        conf = json.decode_file(conf_path)
 58
 59    sync_main(conf)
 60
 61
 62def sync_main(conf: json.Data):
 63    """Sync main entry point"""
 64    aio.init_asyncio()
 65
 66    common.json_schema_repo.validate('hat-gateway://main.yaml#', conf)
 67
 68    for device_conf in conf['devices']:
 69        module = importlib.import_module(device_conf['module'])
 70        if module.json_schema_repo and module.json_schema_id:
 71            module.json_schema_repo.validate(module.json_schema_id,
 72                                             device_conf)
 73
 74    logging.config.dictConfig(conf['log'])
 75
 76    with contextlib.suppress(asyncio.CancelledError):
 77        aio.run_asyncio(async_main(conf))
 78
 79
 80async def async_main(conf: json.Data):
 81    """Async main entry point"""
 82    async_group = aio.Group()
 83
 84    try:
 85        subscriptions = [('gateway', conf['gateway_name'], '?', '?',
 86                          'system', '*')]
 87
 88        if 'monitor' in conf:
 89            monitor_client = await hat.monitor.client.connect(conf['monitor'])
 90            _bind_resource(async_group, monitor_client)
 91
 92            monitor_component = hat.monitor.client.Component(
 93                monitor_client, run_with_monitor, conf, monitor_client,
 94                subscriptions)
 95            monitor_component.set_ready(True)
 96            _bind_resource(async_group, monitor_component)
 97
 98        else:
 99            eventer_client = await hat.event.eventer.connect(
100                conf['event_server_address'], subscriptions)
101            _bind_resource(async_group, eventer_client)
102
103            eventer_runner = EventerRunner(conf, eventer_client)
104            _bind_resource(async_group, eventer_runner)
105
106        await async_group.wait_closing()
107
108    finally:
109        await aio.uncancellable(async_group.async_close())
110
111
112async def run_with_monitor(monitor_component: hat.monitor.client.Component,
113                           conf: json.Data,
114                           monitor_client: hat.monitor.client.Client,
115                           subscriptions: typing.List[hat.event.common.EventType]):  # NOQA
116    """Run monitor component"""
117    component_cb = functools.partial(EventerRunner, conf)
118    eventer_component = hat.event.eventer.Component(
119        monitor_client, conf['event_server_group'], component_cb,
120        subscriptions)
121
122    try:
123        await eventer_component.wait_closing()
124
125    finally:
126        await aio.uncancellable(eventer_component.async_close())
127
128
129class EventerRunner(aio.Resource):
130
131    def __init__(self,
132                 conf: json.Data,
133                 eventer_client: hat.event.eventer.Client):
134        self._async_group = aio.Group()
135
136        self.async_group.spawn(self._run, conf, eventer_client)
137
138    @property
139    def async_group(self):
140        return self._async_group
141
142    async def _run(self, conf, eventer_client):
143        try:
144            engine = await create_engine(conf, eventer_client)
145
146            try:
147                mlog.debug('engine created')
148                await engine.wait_closing()
149
150            finally:
151                mlog.debug('engine closing')
152                await aio.uncancellable(engine.async_close())
153
154        except Exception as e:
155            mlog.error('eventer runner error: %s', e, exc_info=e)
156
157        finally:
158            self.close()
159
160
161def _bind_resource(async_group, resource):
162    async_group.spawn(aio.call_on_cancel, resource.async_close)
163    async_group.spawn(aio.call_on_done, resource.wait_closing(),
164                      async_group.close)
165
166
167if __name__ == '__main__':
168    sys.argv[0] = 'hat-gateway'
169    sys.exit(main())
mlog: logging.Logger = <Logger hat.gateway.main (WARNING)>

Module logger

user_conf_dir: pathlib.Path = PosixPath('/home/runner/.config/hat')

User configuration directory path

def create_argument_parser() -> argparse.ArgumentParser:
33def create_argument_parser() -> argparse.ArgumentParser:
34    """Create argument parser"""
35    parser = argparse.ArgumentParser()
36    parser.add_argument(
37        '--conf', metavar='PATH', type=Path, default=None,
38        help="configuration defined by hat-gateway://main.yaml# "
39             "(default $XDG_CONFIG_HOME/hat/gateway.{yaml|yml|json})")
40    return parser

Create argument parser

def main():
43def main():
44    """Gateway"""
45    parser = create_argument_parser()
46    args = parser.parse_args()
47
48    conf_path = args.conf
49    if not conf_path:
50        for suffix in ('.yaml', '.yml', '.json'):
51            conf_path = (user_conf_dir / 'gateway').with_suffix(suffix)
52            if conf_path.exists():
53                break
54
55    if conf_path == Path('-'):
56        conf = json.decode_stream(sys.stdin)
57    else:
58        conf = json.decode_file(conf_path)
59
60    sync_main(conf)

Gateway

def sync_main( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
63def sync_main(conf: json.Data):
64    """Sync main entry point"""
65    aio.init_asyncio()
66
67    common.json_schema_repo.validate('hat-gateway://main.yaml#', conf)
68
69    for device_conf in conf['devices']:
70        module = importlib.import_module(device_conf['module'])
71        if module.json_schema_repo and module.json_schema_id:
72            module.json_schema_repo.validate(module.json_schema_id,
73                                             device_conf)
74
75    logging.config.dictConfig(conf['log'])
76
77    with contextlib.suppress(asyncio.CancelledError):
78        aio.run_asyncio(async_main(conf))

Sync main entry point

async def async_main( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
 81async def async_main(conf: json.Data):
 82    """Async main entry point"""
 83    async_group = aio.Group()
 84
 85    try:
 86        subscriptions = [('gateway', conf['gateway_name'], '?', '?',
 87                          'system', '*')]
 88
 89        if 'monitor' in conf:
 90            monitor_client = await hat.monitor.client.connect(conf['monitor'])
 91            _bind_resource(async_group, monitor_client)
 92
 93            monitor_component = hat.monitor.client.Component(
 94                monitor_client, run_with_monitor, conf, monitor_client,
 95                subscriptions)
 96            monitor_component.set_ready(True)
 97            _bind_resource(async_group, monitor_component)
 98
 99        else:
100            eventer_client = await hat.event.eventer.connect(
101                conf['event_server_address'], subscriptions)
102            _bind_resource(async_group, eventer_client)
103
104            eventer_runner = EventerRunner(conf, eventer_client)
105            _bind_resource(async_group, eventer_runner)
106
107        await async_group.wait_closing()
108
109    finally:
110        await aio.uncancellable(async_group.async_close())

Async main entry point

async def run_with_monitor( monitor_component: hat.monitor.client.Component, conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], monitor_client: hat.monitor.client.Client, subscriptions: List[Tuple[str, ...]]):
113async def run_with_monitor(monitor_component: hat.monitor.client.Component,
114                           conf: json.Data,
115                           monitor_client: hat.monitor.client.Client,
116                           subscriptions: typing.List[hat.event.common.EventType]):  # NOQA
117    """Run monitor component"""
118    component_cb = functools.partial(EventerRunner, conf)
119    eventer_component = hat.event.eventer.Component(
120        monitor_client, conf['event_server_group'], component_cb,
121        subscriptions)
122
123    try:
124        await eventer_component.wait_closing()
125
126    finally:
127        await aio.uncancellable(eventer_component.async_close())

Run monitor component

class EventerRunner(hat.aio.group.Resource):
130class EventerRunner(aio.Resource):
131
132    def __init__(self,
133                 conf: json.Data,
134                 eventer_client: hat.event.eventer.Client):
135        self._async_group = aio.Group()
136
137        self.async_group.spawn(self._run, conf, eventer_client)
138
139    @property
140    def async_group(self):
141        return self._async_group
142
143    async def _run(self, conf, eventer_client):
144        try:
145            engine = await create_engine(conf, eventer_client)
146
147            try:
148                mlog.debug('engine created')
149                await engine.wait_closing()
150
151            finally:
152                mlog.debug('engine closing')
153                await aio.uncancellable(engine.async_close())
154
155        except Exception as e:
156            mlog.error('eventer runner error: %s', e, exc_info=e)
157
158        finally:
159            self.close()

Resource with lifetime control based on Group.

EventerRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client)
132    def __init__(self,
133                 conf: json.Data,
134                 eventer_client: hat.event.eventer.Client):
135        self._async_group = aio.Group()
136
137        self.async_group.spawn(self._run, conf, eventer_client)
async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close