hat.gateway.adminer      
                        Gateway adminer communication protocol
1"""Gateway adminer communication protocol""" 2 3from hat.gateway.adminer.client import (AdminerError, 4 connect, 5 Client) 6from hat.gateway.adminer.server import (GetLogConfCb, 7 SetLogConfCb, 8 listen, 9 Server) 10 11 12__all__ = ['AdminerError', 13 'connect', 14 'Client', 15 'GetLogConfCb', 16 'SetLogConfCb', 17 'listen', 18 'Server']
            
    class
    AdminerError(builtins.Exception):
                
     
    
            
            Errors reported by Gateway Adminer Server
21async def connect(addr: tcp.Address, 22 **kwargs 23 ) -> 'Client': 24 """Connect to Gateway Adminer Server 25 26 Additional arguments are passed to `hat.chatter.connect` coroutine. 27 28 """ 29 client = Client() 30 client._loop = asyncio.get_running_loop() 31 client._conv_msg_type_futures = {} 32 33 client._conn = await chatter.connect(addr, **kwargs) 34 35 try: 36 client.async_group.spawn(client._receive_loop) 37 38 except Exception: 39 await aio.uncancellable(client.async_close()) 40 raise 41 42 return client
Connect to Gateway Adminer Server
Additional arguments are passed to hat.chatter.connect coroutine.
            
    class
    Client(hat.aio.group.Resource):
                
     
    
            45class Client(aio.Resource): 46 """Gateway adminer client 47 48 For creating new client see `connect` coroutine. 49 50 """ 51 52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._conn.async_group 56 57 async def get_log_conf(self) -> json.Data: 58 """Get logging configuration""" 59 data = await self._send( 60 req_msg_type='HatGatewayAdminer.MsgGetLogConfReq', 61 req_msg_data=None, 62 res_msg_type='HatGatewayAdminer.MsgGetLogConfRes') 63 64 return json.decode(data) 65 66 async def set_log_conf(self, conf: json.Data): 67 """Set logging configuration""" 68 await self._send(req_msg_type='HatGatewayAdminer.MsgSetLogConfReq', 69 req_msg_data=json.encode(conf), 70 res_msg_type='HatGatewayAdminer.MsgSetLogConfRes') 71 72 async def _send(self, req_msg_type, req_msg_data, res_msg_type): 73 conv = await common.send_msg( 74 conn=self._conn, 75 msg_type=req_msg_type, 76 msg_data=req_msg_data, 77 last=False) 78 79 if not self.is_open: 80 raise ConnectionError() 81 82 future = self._loop.create_future() 83 self._conv_msg_type_futures[conv] = res_msg_type, future 84 85 try: 86 return await future 87 88 finally: 89 self._conv_msg_type_futures.pop(conv, None) 90 91 async def _receive_loop(self): 92 mlog.debug("starting receive loop") 93 try: 94 while True: 95 mlog.debug("waiting for incoming message") 96 msg, msg_type, msg_data = await common.receive_msg(self._conn) 97 98 mlog.debug(f"received message {msg_type}") 99 100 res_msg_type, future = self._conv_msg_type_futures.get( 101 msg.conv, (None, None)) 102 if not future or future.done(): 103 return 104 105 if res_msg_type != msg_type: 106 raise Exception('invalid response message type') 107 108 if msg_data[0] == 'error': 109 future.set_exception(AdminerError(msg_data[1])) 110 continue 111 112 future.set_result(msg_data[1]) 113 114 except ConnectionError: 115 pass 116 117 except Exception as e: 118 mlog.error("read loop error: %s", e, exc_info=e) 119 120 finally: 121 mlog.debug("stopping receive loop") 122 self.close() 123 124 for _, future in self._conv_msg_type_futures.values(): 125 if not future.done(): 126 future.set_exception(ConnectionError())
Gateway adminer client
For creating new client see connect coroutine.
            async_group: hat.aio.group.Group
                
    
    
            52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._conn.async_group
Async group
            
        async def
        get_log_conf(	self) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
                
    
    
            57 async def get_log_conf(self) -> json.Data: 58 """Get logging configuration""" 59 data = await self._send( 60 req_msg_type='HatGatewayAdminer.MsgGetLogConfReq', 61 req_msg_data=None, 62 res_msg_type='HatGatewayAdminer.MsgGetLogConfRes') 63 64 return json.decode(data)
Get logging configuration
            
        async def
        set_log_conf(	self,	conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
                
    
    
            66 async def set_log_conf(self, conf: json.Data): 67 """Set logging configuration""" 68 await self._send(req_msg_type='HatGatewayAdminer.MsgSetLogConfReq', 69 req_msg_data=json.encode(conf), 70 res_msg_type='HatGatewayAdminer.MsgSetLogConfRes')
Set logging configuration
            GetLogConfCb        =
            typing.Callable[[NoneType], typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], collections.abc.Awaitable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]]]]
        
    
    
    
    
                
            SetLogConfCb        =
            typing.Callable[[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], None | collections.abc.Awaitable[None]]
        
    
    
    
    
                
            
        async def
        listen(	addr: hat.drivers.tcp.Address,	*,	get_log_conf_cb: Optional[Callable[[NoneType], Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Awaitable[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]]] = None,	set_log_conf_cb: Optional[Callable[[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], None | Awaitable[None]]] = None,	**kwargs) -> Server:
                
    
    
            23async def listen(addr: tcp.Address, 24 *, 25 get_log_conf_cb: GetLogConfCb | None = None, 26 set_log_conf_cb: SetLogConfCb | None = None, 27 **kwargs 28 ) -> 'Server': 29 """Create listening Gateway Adminer Server instance""" 30 server = Server() 31 server._get_log_conf_cb = get_log_conf_cb 32 server._set_log_conf_cb = set_log_conf_cb 33 34 server._srv = await chatter.listen(server._connection_loop, addr, **kwargs) 35 mlog.debug("listening on %s", addr) 36 37 return server
Create listening Gateway Adminer Server instance
            
    class
    Server(hat.aio.group.Resource):
                
     
    
            40class Server(aio.Resource): 41 42 @property 43 def async_group(self) -> aio.Group: 44 """Async group""" 45 return self._srv.async_group 46 47 async def _connection_loop(self, conn): 48 mlog.debug("starting connection loop") 49 try: 50 while True: 51 mlog.debug("waiting for incomming messages") 52 msg, msg_type, msg_data = await common.receive_msg(conn) 53 54 mlog.debug(f"received message {msg_type}") 55 56 if msg_type == 'HatGatewayAdminer.MsgGetLogConfReq': 57 await self._process_msg_get_log_conf( 58 conn=conn, 59 conv=msg.conv, 60 req_msg_data=msg_data) 61 62 elif msg_type == 'HatGatewayAdminer.MsgSetLogConfReq': 63 await self._process_msg_set_log_conf( 64 conn=conn, 65 conv=msg.conv, 66 req_msg_data=msg_data) 67 68 else: 69 raise Exception('unsupported message type') 70 71 except ConnectionError: 72 pass 73 74 except Exception as e: 75 mlog.error("on connection error: %s", e, exc_info=e) 76 77 finally: 78 mlog.debug("stopping connection loop") 79 conn.close() 80 81 async def _process_msg_get_log_conf(self, conn, conv, req_msg_data): 82 try: 83 if not self._get_log_conf_cb: 84 raise Exception('not implemented') 85 86 result = await aio.call(self._get_log_conf_cb) 87 88 res_msg_data = 'success', json.encode(result) 89 90 except Exception as e: 91 res_msg_data = 'error', str(e) 92 93 await common.send_msg( 94 conn, 'HatGatewayAdminer.MsgGetLogConfRes', res_msg_data, 95 conv=conv) 96 97 async def _process_msg_set_log_conf(self, conn, conv, req_msg_data): 98 try: 99 if not self._set_log_conf_cb: 100 raise Exception('not implemented') 101 102 conf = json.decode(req_msg_data) 103 await aio.call(self._set_log_conf_cb, conf) 104 105 res_msg_data = 'success', None 106 107 except Exception as e: 108 res_msg_data = 'error', str(e) 109 110 await common.send_msg( 111 conn, 'HatGatewayAdminer.MsgSetLogConfRes', res_msg_data, 112 conv=conv)
Resource with lifetime control based on Group.