hat.gateway.devices.modbus.master.eventer_client         
                
                        
                        
                        1from collections.abc import Collection, Iterable 2import contextlib 3import logging 4import typing 5 6from hat import aio 7import hat.event.common 8import hat.event.eventer 9 10from hat.gateway.devices.modbus.master import common 11 12 13mlog = logging.getLogger(__name__) 14 15 16class RemoteDeviceEnableReq(typing.NamedTuple): 17 device_id: int 18 enable: bool 19 20 21class RemoteDeviceWriteReq(typing.NamedTuple): 22 device_id: int 23 data_name: str 24 request_id: str 25 value: int 26 27 28Request: typing.TypeAlias = RemoteDeviceEnableReq | RemoteDeviceWriteReq 29 30 31class StatusRes(typing.NamedTuple): 32 status: str 33 34 35class RemoteDeviceStatusRes(typing.NamedTuple): 36 device_id: int 37 status: str 38 39 40class RemoteDeviceReadRes(typing.NamedTuple): 41 device_id: int 42 data_name: str 43 result: str 44 value: int | None 45 cause: str | None 46 47 48class RemoteDeviceWriteRes(typing.NamedTuple): 49 device_id: int 50 data_name: str 51 request_id: str 52 result: str 53 54 55Response: typing.TypeAlias = (StatusRes | 56 RemoteDeviceStatusRes | 57 RemoteDeviceReadRes | 58 RemoteDeviceWriteRes) 59 60 61class EventerClientProxy(aio.Resource): 62 63 def __init__(self, 64 eventer_client: hat.event.eventer.Client, 65 event_type_prefix: common.EventTypePrefix, 66 name: str): 67 self._eventer_client = eventer_client 68 self._event_type_prefix = event_type_prefix 69 self._log = common.create_device_logger_adapter(mlog, name) 70 71 @property 72 def async_group(self) -> aio.Group: 73 return self._eventer_client.async_group 74 75 def process_events(self, 76 events: Collection[hat.event.common.Event] 77 ) -> Iterable[Request]: 78 self._log.debug('received %s events', len(events)) 79 for event in events: 80 try: 81 yield _request_from_event(self._event_type_prefix, event) 82 83 except Exception as e: 84 self._log.info('received invalid event: %s', e, exc_info=e) 85 86 async def write(self, responses: Iterable[Response]): 87 events = [_response_to_register_event(self._event_type_prefix, i) 88 for i in responses] 89 await self._eventer_client.register(events) 90 91 async def query_enabled_devices(self) -> set[int]: 92 self._log.debug('querying enabled devices') 93 enabled_devices = set() 94 95 event_type = (*self._event_type_prefix, 'system', 'remote_device', 96 '?', 'enable') 97 params = hat.event.common.QueryLatestParams([event_type]) 98 result = await self._eventer_client.query(params) 99 self._log.debug('received %s events', len(result.events)) 100 101 for event in result.events: 102 if not event.payload or not bool(event.payload.data): 103 continue 104 105 device_id_str = event.type[len(self._event_type_prefix) + 2] 106 with contextlib.suppress(ValueError): 107 enabled_devices.add(int(device_id_str)) 108 109 self._log.debug('detected %s enabled devices', len(enabled_devices)) 110 return enabled_devices 111 112 113def _request_from_event(event_type_prefix, event): 114 event_type_suffix = event.type[len(event_type_prefix):] 115 116 if event_type_suffix[:2] != ('system', 'remote_device'): 117 raise Exception('unsupported event type') 118 119 device_id = int(event_type_suffix[2]) 120 121 if event_type_suffix[3] == 'enable': 122 enable = bool(event.payload.data) 123 return RemoteDeviceEnableReq(device_id=device_id, 124 enable=enable) 125 126 if event_type_suffix[3] == 'write': 127 data_name = event_type_suffix[4] 128 request_id = event.payload.data['request_id'] 129 value = event.payload.data['value'] 130 return RemoteDeviceWriteReq(device_id=device_id, 131 data_name=data_name, 132 request_id=request_id, 133 value=value) 134 135 raise Exception('unsupported event type') 136 137 138def _response_to_register_event(event_type_prefix, res): 139 if isinstance(res, StatusRes): 140 event_type = (*event_type_prefix, 'gateway', 'status') 141 payload = res.status 142 143 elif isinstance(res, RemoteDeviceStatusRes): 144 event_type = (*event_type_prefix, 'gateway', 'remote_device', 145 str(res.device_id), 'status') 146 payload = res.status 147 148 elif isinstance(res, RemoteDeviceReadRes): 149 event_type = (*event_type_prefix, 'gateway', 'remote_device', 150 str(res.device_id), 'read', res.data_name) 151 payload = {'result': res.result} 152 if res.value is not None: 153 payload['value'] = res.value 154 if res.cause is not None: 155 payload['cause'] = res.cause 156 157 elif isinstance(res, RemoteDeviceWriteRes): 158 event_type = (*event_type_prefix, 'gateway', 'remote_device', 159 str(res.device_id), 'write', res.data_name) 160 payload = {'request_id': res.request_id, 161 'result': res.result} 162 163 else: 164 raise ValueError('invalid response type') 165 166 return hat.event.common.RegisterEvent( 167 type=event_type, 168 source_timestamp=None, 169 payload=hat.event.common.EventPayloadJson(payload))
            mlog        =
<Logger hat.gateway.devices.modbus.master.eventer_client (WARNING)>
        
    
    
    
    
                
            
    class
    RemoteDeviceEnableReq(typing.NamedTuple):
                
     
    
            
            RemoteDeviceEnableReq(device_id, enable)
            
    class
    RemoteDeviceWriteReq(typing.NamedTuple):
                
     
    
            22class RemoteDeviceWriteReq(typing.NamedTuple): 23 device_id: int 24 data_name: str 25 request_id: str 26 value: int
RemoteDeviceWriteReq(device_id, data_name, request_id, value)
            Request: hat.drivers.modbus.transport.common.ErrorRes | hat.drivers.modbus.transport.common.ReadCoilsRes | hat.drivers.modbus.transport.common.ReadDiscreteInputsRes | hat.drivers.modbus.transport.common.ReadHoldingRegistersRes | hat.drivers.modbus.transport.common.ReadInputRegistersRes | hat.drivers.modbus.transport.common.WriteSingleCoilRes | hat.drivers.modbus.transport.common.WriteSingleRegisterRes | hat.drivers.modbus.transport.common.WriteMultipleCoilsRes | hat.drivers.modbus.transport.common.WriteMultipleRegistersRes | hat.drivers.modbus.transport.common.MaskWriteRegisterRes | hat.drivers.modbus.transport.common.ReadFifoQueueRes        =
            RemoteDeviceEnableReq | RemoteDeviceWriteReq
        
    
    
    
    
                
            
    class
    StatusRes(typing.NamedTuple):
                
     
    
            
            StatusRes(status,)
            
    class
    RemoteDeviceStatusRes(typing.NamedTuple):
                
     
    
            
            RemoteDeviceStatusRes(device_id, status)
            
    class
    RemoteDeviceReadRes(typing.NamedTuple):
                
     
    
            41class RemoteDeviceReadRes(typing.NamedTuple): 42 device_id: int 43 data_name: str 44 result: str 45 value: int | None 46 cause: str | None
RemoteDeviceReadRes(device_id, data_name, result, value, cause)
            
    class
    RemoteDeviceWriteRes(typing.NamedTuple):
                
     
    
            49class RemoteDeviceWriteRes(typing.NamedTuple): 50 device_id: int 51 data_name: str 52 request_id: str 53 result: str
RemoteDeviceWriteRes(device_id, data_name, request_id, result)
            Response: hat.drivers.modbus.transport.common.ErrorRes | hat.drivers.modbus.transport.common.ReadCoilsRes | hat.drivers.modbus.transport.common.ReadDiscreteInputsRes | hat.drivers.modbus.transport.common.ReadHoldingRegistersRes | hat.drivers.modbus.transport.common.ReadInputRegistersRes | hat.drivers.modbus.transport.common.WriteSingleCoilRes | hat.drivers.modbus.transport.common.WriteSingleRegisterRes | hat.drivers.modbus.transport.common.WriteMultipleCoilsRes | hat.drivers.modbus.transport.common.WriteMultipleRegistersRes | hat.drivers.modbus.transport.common.MaskWriteRegisterRes | hat.drivers.modbus.transport.common.ReadFifoQueueRes        =
            StatusRes | RemoteDeviceStatusRes | RemoteDeviceReadRes | RemoteDeviceWriteRes
        
    
    
    
    
                
            
    class
    EventerClientProxy(hat.aio.group.Resource):
                
     
    
            62class EventerClientProxy(aio.Resource): 63 64 def __init__(self, 65 eventer_client: hat.event.eventer.Client, 66 event_type_prefix: common.EventTypePrefix, 67 name: str): 68 self._eventer_client = eventer_client 69 self._event_type_prefix = event_type_prefix 70 self._log = common.create_device_logger_adapter(mlog, name) 71 72 @property 73 def async_group(self) -> aio.Group: 74 return self._eventer_client.async_group 75 76 def process_events(self, 77 events: Collection[hat.event.common.Event] 78 ) -> Iterable[Request]: 79 self._log.debug('received %s events', len(events)) 80 for event in events: 81 try: 82 yield _request_from_event(self._event_type_prefix, event) 83 84 except Exception as e: 85 self._log.info('received invalid event: %s', e, exc_info=e) 86 87 async def write(self, responses: Iterable[Response]): 88 events = [_response_to_register_event(self._event_type_prefix, i) 89 for i in responses] 90 await self._eventer_client.register(events) 91 92 async def query_enabled_devices(self) -> set[int]: 93 self._log.debug('querying enabled devices') 94 enabled_devices = set() 95 96 event_type = (*self._event_type_prefix, 'system', 'remote_device', 97 '?', 'enable') 98 params = hat.event.common.QueryLatestParams([event_type]) 99 result = await self._eventer_client.query(params) 100 self._log.debug('received %s events', len(result.events)) 101 102 for event in result.events: 103 if not event.payload or not bool(event.payload.data): 104 continue 105 106 device_id_str = event.type[len(self._event_type_prefix) + 2] 107 with contextlib.suppress(ValueError): 108 enabled_devices.add(int(device_id_str)) 109 110 self._log.debug('detected %s enabled devices', len(enabled_devices)) 111 return enabled_devices
Resource with lifetime control based on Group.
            
        EventerClientProxy(	eventer_client: hat.event.eventer.client.Client,	event_type_prefix: tuple[str, str, str],	name: str)
                
    
    
            
    
                            
            
        def
        process_events(	self,	events: Collection[hat.event.common.common.Event]) -> Iterable[RemoteDeviceEnableReq | RemoteDeviceWriteReq]:
                
    
    
            76 def process_events(self, 77 events: Collection[hat.event.common.Event] 78 ) -> Iterable[Request]: 79 self._log.debug('received %s events', len(events)) 80 for event in events: 81 try: 82 yield _request_from_event(self._event_type_prefix, event) 83 84 except Exception as e: 85 self._log.info('received invalid event: %s', e, exc_info=e)
            
        async def
        write(	self,	responses: Iterable[StatusRes | RemoteDeviceStatusRes | RemoteDeviceReadRes | RemoteDeviceWriteRes]):
                
    
    
            
    
                            
            
        async def
        query_enabled_devices(self) -> set[int]:
                
    
    
            92 async def query_enabled_devices(self) -> set[int]: 93 self._log.debug('querying enabled devices') 94 enabled_devices = set() 95 96 event_type = (*self._event_type_prefix, 'system', 'remote_device', 97 '?', 'enable') 98 params = hat.event.common.QueryLatestParams([event_type]) 99 result = await self._eventer_client.query(params) 100 self._log.debug('received %s events', len(result.events)) 101 102 for event in result.events: 103 if not event.payload or not bool(event.payload.data): 104 continue 105 106 device_id_str = event.type[len(self._event_type_prefix) + 2] 107 with contextlib.suppress(ValueError): 108 enabled_devices.add(int(device_id_str)) 109 110 self._log.debug('detected %s enabled devices', len(enabled_devices)) 111 return enabled_devices