hat.gateway.devices.snmp.trap_listener
SNMP trap listener device
1"""SNMP trap listener device""" 2 3from collections.abc import Collection 4import collections 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10from hat.drivers import snmp 11from hat.drivers import udp 12import hat.event.common 13import hat.event.eventer 14 15from hat.gateway import common 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19 20 21async def create(conf: common.DeviceConf, 22 eventer_client: hat.event.eventer.Client, 23 event_type_prefix: common.EventTypePrefix 24 ) -> 'SnmpTrapListenerDevice': 25 device = SnmpTrapListenerDevice() 26 device._eventer_client = eventer_client 27 device._event_type_prefix = event_type_prefix 28 device._remote_devices = collections.defaultdict(collections.deque) 29 30 for remote_device_conf in conf['remote_devices']: 31 version = snmp.Version[remote_device_conf['version']] 32 remote_device = _RemoteDevice( 33 name=remote_device_conf['name'], 34 oids={_oid_from_str(oid) 35 for oid in remote_device_conf['oids']}, 36 string_hex_oids={_oid_from_str(oid) 37 for oid in remote_device_conf['string_hex_oids']}) 38 39 if remote_device_conf['version'] == 'V1': 40 subkey = remote_device_conf['community'] 41 42 elif remote_device_conf['version'] == 'V2C': 43 subkey = remote_device_conf['community'] 44 45 elif remote_device_conf['version'] == 'V3': 46 subkey = ( 47 snmp.Context( 48 engine_id=bytes.fromhex( 49 remote_device_conf['context']['engine_id']), 50 name=remote_device_conf['context']['name']) 51 if remote_device_conf['context'] else None) 52 53 else: 54 raise Exception('invalid version') 55 56 device._remote_devices[(version, subkey)].append(remote_device) 57 58 users = [ 59 snmp.User( 60 name=user_conf['name'], 61 auth_type=(snmp.AuthType[user_conf['authentication']['type']] 62 if user_conf['authentication'] else None), 63 auth_password=(user_conf['authentication']['password'] 64 if user_conf['authentication'] else None), 65 priv_type=(snmp.PrivType[user_conf['privacy']['type']] 66 if user_conf['privacy'] else None), 67 priv_password=(user_conf['privacy']['password'] 68 if user_conf['privacy'] else None)) 69 for user_conf in conf['users']] 70 71 device._listener = await snmp.create_trap_listener( 72 local_addr=udp.Address(host=conf['local_host'], 73 port=conf['local_port']), 74 v1_trap_cb=device._on_v1_trap, 75 v2c_trap_cb=device._on_v2c_trap, 76 v2c_inform_cb=device._on_v2c_inform, 77 v3_trap_cb=device._on_v3_trap, 78 v3_inform_cb=device._on_v3_inform, 79 users=users) 80 81 return device 82 83 84info = common.DeviceInfo( 85 type='snmp_trap_listener', 86 create=create, 87 json_schema_id="hat-gateway://snmp.yaml#/$defs/trap_listener", 88 json_schema_repo=common.json_schema_repo) 89 90 91class SnmpTrapListenerDevice(common.Device): 92 93 @property 94 def async_group(self) -> aio.Group: 95 return self._listener.async_group 96 97 async def process_events(self, events: Collection[hat.event.common.Event]): 98 pass 99 100 async def _on_v1_trap(self, addr, community, trap): 101 await self._process_data(version=snmp.Version.V1, 102 subkey=community, 103 data=trap.data) 104 105 async def _on_v2c_trap(self, addr, community, trap): 106 await self._process_data(version=snmp.Version.V2C, 107 subkey=community, 108 data=trap.data) 109 110 async def _on_v2c_inform(self, addr, community, inform): 111 await self._process_data(version=snmp.Version.V2C, 112 subkey=community, 113 data=inform.data) 114 115 async def _on_v3_trap(self, addr, user, context, trap): 116 await self._process_data(version=snmp.Version.V3, 117 subkey=context, 118 data=trap.data) 119 120 async def _on_v3_inform(self, addr, user, context, inform): 121 await self._process_data(version=snmp.Version.V3, 122 subkey=context, 123 data=inform.data) 124 125 async def _process_data(self, version, subkey, data): 126 try: 127 events = collections.deque() 128 129 for key in [(version, subkey), 130 (version, None)]: 131 for remote_device in self._remote_devices.get(key, []): 132 for i in data: 133 if i.name not in remote_device.oids: 134 continue 135 136 event = hat.event.common.RegisterEvent( 137 type=(*self._event_type_prefix, 'gateway', 'data', 138 remote_device.name, _oid_to_str(i.name)), 139 source_timestamp=None, 140 payload=hat.event.common.EventPayloadJson( 141 _event_payload_from_data( 142 i, remote_device.string_hex_oids))) 143 events.append(event) 144 145 if not events: 146 return 147 148 await self._eventer_client.register(events) 149 150 except ConnectionError: 151 pass 152 153 except Exception as e: 154 mlog.error("error processing data: %s", e, exc_info=e) 155 156 157class _RemoteDevice(typing.NamedTuple): 158 name: str 159 oids: set[asn1.ObjectIdentifier] 160 string_hex_oids: set[asn1.ObjectIdentifier] 161 162 163def _oid_from_str(oid_str): 164 return tuple(int(i) for i in oid_str.split('.')) 165 166 167def _oid_to_str(oid): 168 return '.'.join(str(i) for i in oid) 169 170 171def _event_payload_from_data(data, string_hex_oids): 172 if isinstance(data, snmp.EmptyData): 173 return {'type': 'ERROR', 174 'value': 'EMPTY'} 175 176 if isinstance(data, snmp.UnspecifiedData): 177 return {'type': 'ERROR', 178 'value': 'UNSPECIFIED'} 179 180 if isinstance(data, snmp.NoSuchObjectData): 181 return {'type': 'ERROR', 182 'value': 'NO_SUCH_OBJECT'} 183 184 if isinstance(data, snmp.NoSuchInstanceData): 185 return {'type': 'ERROR', 186 'value': 'NO_SUCH_INSTANCE'} 187 188 if isinstance(data, snmp.EndOfMibViewData): 189 return {'type': 'ERROR', 190 'value': 'END_OF_MIB_VIEW'} 191 192 if isinstance(data, snmp.IntegerData): 193 return {'type': 'INTEGER', 194 'value': data.value} 195 196 if isinstance(data, snmp.UnsignedData): 197 return {'type': 'UNSIGNED', 198 'value': data.value} 199 200 if isinstance(data, snmp.CounterData): 201 return {'type': 'COUNTER', 202 'value': data.value} 203 204 if isinstance(data, snmp.BigCounterData): 205 return {'type': 'BIG_COUNTER', 206 'value': data.value} 207 208 if isinstance(data, snmp.TimeTicksData): 209 return {'type': 'TIME_TICKS', 210 'value': data.value} 211 212 if isinstance(data, snmp.StringData): 213 if data.name in string_hex_oids: 214 return {'type': 'STRING_HEX', 215 'value': data.value.hex()} 216 217 return {'type': 'STRING', 218 'value': str(data.value, encoding='utf-8', errors='replace')} 219 220 if isinstance(data, snmp.ObjectIdData): 221 return {'type': 'OBJECT_ID', 222 'value': _oid_to_str(data.value)} 223 224 if isinstance(data, snmp.IpAddressData): 225 return {'type': 'IP_ADDRESS', 226 'value': '.'.join(str(i) for i in data.value)} 227 228 if isinstance(data, snmp.ArbitraryData): 229 return {'type': 'ARBITRARY', 230 'value': data.value.hex()} 231 232 raise Exception('invalid response data')
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> SnmpTrapListenerDevice:
22async def create(conf: common.DeviceConf, 23 eventer_client: hat.event.eventer.Client, 24 event_type_prefix: common.EventTypePrefix 25 ) -> 'SnmpTrapListenerDevice': 26 device = SnmpTrapListenerDevice() 27 device._eventer_client = eventer_client 28 device._event_type_prefix = event_type_prefix 29 device._remote_devices = collections.defaultdict(collections.deque) 30 31 for remote_device_conf in conf['remote_devices']: 32 version = snmp.Version[remote_device_conf['version']] 33 remote_device = _RemoteDevice( 34 name=remote_device_conf['name'], 35 oids={_oid_from_str(oid) 36 for oid in remote_device_conf['oids']}, 37 string_hex_oids={_oid_from_str(oid) 38 for oid in remote_device_conf['string_hex_oids']}) 39 40 if remote_device_conf['version'] == 'V1': 41 subkey = remote_device_conf['community'] 42 43 elif remote_device_conf['version'] == 'V2C': 44 subkey = remote_device_conf['community'] 45 46 elif remote_device_conf['version'] == 'V3': 47 subkey = ( 48 snmp.Context( 49 engine_id=bytes.fromhex( 50 remote_device_conf['context']['engine_id']), 51 name=remote_device_conf['context']['name']) 52 if remote_device_conf['context'] else None) 53 54 else: 55 raise Exception('invalid version') 56 57 device._remote_devices[(version, subkey)].append(remote_device) 58 59 users = [ 60 snmp.User( 61 name=user_conf['name'], 62 auth_type=(snmp.AuthType[user_conf['authentication']['type']] 63 if user_conf['authentication'] else None), 64 auth_password=(user_conf['authentication']['password'] 65 if user_conf['authentication'] else None), 66 priv_type=(snmp.PrivType[user_conf['privacy']['type']] 67 if user_conf['privacy'] else None), 68 priv_password=(user_conf['privacy']['password'] 69 if user_conf['privacy'] else None)) 70 for user_conf in conf['users']] 71 72 device._listener = await snmp.create_trap_listener( 73 local_addr=udp.Address(host=conf['local_host'], 74 port=conf['local_port']), 75 v1_trap_cb=device._on_v1_trap, 76 v2c_trap_cb=device._on_v2c_trap, 77 v2c_inform_cb=device._on_v2c_inform, 78 v3_trap_cb=device._on_v3_trap, 79 v3_inform_cb=device._on_v3_inform, 80 users=users) 81 82 return device
info =
DeviceInfo(type='snmp_trap_listener', create=<function create>, json_schema_id='hat-gateway://snmp.yaml#/$defs/trap_listener', json_schema_repo=<hat.json.repository.SchemaRepository object>)
92class SnmpTrapListenerDevice(common.Device): 93 94 @property 95 def async_group(self) -> aio.Group: 96 return self._listener.async_group 97 98 async def process_events(self, events: Collection[hat.event.common.Event]): 99 pass 100 101 async def _on_v1_trap(self, addr, community, trap): 102 await self._process_data(version=snmp.Version.V1, 103 subkey=community, 104 data=trap.data) 105 106 async def _on_v2c_trap(self, addr, community, trap): 107 await self._process_data(version=snmp.Version.V2C, 108 subkey=community, 109 data=trap.data) 110 111 async def _on_v2c_inform(self, addr, community, inform): 112 await self._process_data(version=snmp.Version.V2C, 113 subkey=community, 114 data=inform.data) 115 116 async def _on_v3_trap(self, addr, user, context, trap): 117 await self._process_data(version=snmp.Version.V3, 118 subkey=context, 119 data=trap.data) 120 121 async def _on_v3_inform(self, addr, user, context, inform): 122 await self._process_data(version=snmp.Version.V3, 123 subkey=context, 124 data=inform.data) 125 126 async def _process_data(self, version, subkey, data): 127 try: 128 events = collections.deque() 129 130 for key in [(version, subkey), 131 (version, None)]: 132 for remote_device in self._remote_devices.get(key, []): 133 for i in data: 134 if i.name not in remote_device.oids: 135 continue 136 137 event = hat.event.common.RegisterEvent( 138 type=(*self._event_type_prefix, 'gateway', 'data', 139 remote_device.name, _oid_to_str(i.name)), 140 source_timestamp=None, 141 payload=hat.event.common.EventPayloadJson( 142 _event_payload_from_data( 143 i, remote_device.string_hex_oids))) 144 events.append(event) 145 146 if not events: 147 return 148 149 await self._eventer_client.register(events) 150 151 except ConnectionError: 152 pass 153 154 except Exception as e: 155 mlog.error("error processing data: %s", e, exc_info=e)
Device interface