hat.gateway.adminer_server

 1import logging
 2
 3from hat import aio
 4from hat import json
 5from hat.drivers import tcp
 6
 7from hat.gateway import adminer
 8
 9
10async def create_adminer_server(addr: tcp.Address,
11                                log_conf: json.Data,
12                                **kwargs
13                                ) -> 'AdminerServer':
14    """Create adminer server"""
15    server = AdminerServer()
16    server._log_conf = log_conf
17
18    server._srv = await adminer.listen(addr,
19                                       get_log_conf_cb=server._on_get_log_conf,
20                                       set_log_conf_cb=server._on_set_log_conf,
21                                       **kwargs)
22
23    return server
24
25
26class AdminerServer(aio.Resource):
27    """Adminer server
28
29    For creating new server see `create_adminer_server` coroutine.
30
31    """
32
33    @property
34    def async_group(self) -> aio.Group:
35        """Async group"""
36        return self._srv.async_group
37
38    def _on_get_log_conf(self):
39        return self._log_conf
40
41    def _on_set_log_conf(self, log_conf):
42        logging.config.dictConfig(log_conf)
43        self._log_conf = log_conf
async def create_adminer_server( addr: hat.drivers.tcp.Address, log_conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], **kwargs) -> AdminerServer:
11async def create_adminer_server(addr: tcp.Address,
12                                log_conf: json.Data,
13                                **kwargs
14                                ) -> 'AdminerServer':
15    """Create adminer server"""
16    server = AdminerServer()
17    server._log_conf = log_conf
18
19    server._srv = await adminer.listen(addr,
20                                       get_log_conf_cb=server._on_get_log_conf,
21                                       set_log_conf_cb=server._on_set_log_conf,
22                                       **kwargs)
23
24    return server

Create adminer server

class AdminerServer(hat.aio.group.Resource):
27class AdminerServer(aio.Resource):
28    """Adminer server
29
30    For creating new server see `create_adminer_server` coroutine.
31
32    """
33
34    @property
35    def async_group(self) -> aio.Group:
36        """Async group"""
37        return self._srv.async_group
38
39    def _on_get_log_conf(self):
40        return self._log_conf
41
42    def _on_set_log_conf(self, log_conf):
43        logging.config.dictConfig(log_conf)
44        self._log_conf = log_conf

Adminer server

For creating new server see create_adminer_server coroutine.

async_group: hat.aio.group.Group
34    @property
35    def async_group(self) -> aio.Group:
36        """Async group"""
37        return self._srv.async_group

Async group