hat.gateway.adminer_server
1import logging 2 3from hat import aio 4from hat import json 5from hat.drivers import tcp 6 7from hat.gateway import adminer 8 9 10async def create_adminer_server(addr: tcp.Address, 11 log_conf: json.Data, 12 **kwargs 13 ) -> 'AdminerServer': 14 """Create adminer server""" 15 server = AdminerServer() 16 server._log_conf = log_conf 17 18 server._srv = await adminer.listen(addr, 19 get_log_conf_cb=server._on_get_log_conf, 20 set_log_conf_cb=server._on_set_log_conf, 21 **kwargs) 22 23 return server 24 25 26class AdminerServer(aio.Resource): 27 """Adminer server 28 29 For creating new server see `create_adminer_server` coroutine. 30 31 """ 32 33 @property 34 def async_group(self) -> aio.Group: 35 """Async group""" 36 return self._srv.async_group 37 38 def _on_get_log_conf(self): 39 return self._log_conf 40 41 def _on_set_log_conf(self, log_conf): 42 logging.config.dictConfig(log_conf) 43 self._log_conf = log_conf
async def
create_adminer_server( addr: hat.drivers.tcp.Address, log_conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], **kwargs) -> AdminerServer:
11async def create_adminer_server(addr: tcp.Address, 12 log_conf: json.Data, 13 **kwargs 14 ) -> 'AdminerServer': 15 """Create adminer server""" 16 server = AdminerServer() 17 server._log_conf = log_conf 18 19 server._srv = await adminer.listen(addr, 20 get_log_conf_cb=server._on_get_log_conf, 21 set_log_conf_cb=server._on_set_log_conf, 22 **kwargs) 23 24 return server
Create adminer server
class
AdminerServer(hat.aio.group.Resource):
27class AdminerServer(aio.Resource): 28 """Adminer server 29 30 For creating new server see `create_adminer_server` coroutine. 31 32 """ 33 34 @property 35 def async_group(self) -> aio.Group: 36 """Async group""" 37 return self._srv.async_group 38 39 def _on_get_log_conf(self): 40 return self._log_conf 41 42 def _on_set_log_conf(self, log_conf): 43 logging.config.dictConfig(log_conf) 44 self._log_conf = log_conf
Adminer server
For creating new server see create_adminer_server
coroutine.