hat.gateway.devices.snmp.trap_listener

SNMP trap listener device

  1"""SNMP trap listener device"""
  2
  3from collections.abc import Collection
  4import collections
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10from hat.drivers import snmp
 11from hat.drivers import udp
 12import hat.event.common
 13import hat.event.eventer
 14
 15from hat.gateway import common
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19
 20
 21async def create(conf: common.DeviceConf,
 22                 eventer_client: hat.event.eventer.Client,
 23                 event_type_prefix: common.EventTypePrefix
 24                 ) -> 'SnmpTrapListenerDevice':
 25    device = SnmpTrapListenerDevice()
 26    device._eventer_client = eventer_client
 27    device._event_type_prefix = event_type_prefix
 28    device._remote_devices = collections.defaultdict(collections.deque)
 29
 30    for remote_device_conf in conf['remote_devices']:
 31        version = snmp.Version[remote_device_conf['version']]
 32        remote_device = _RemoteDevice(
 33            name=remote_device_conf['name'],
 34            oids={_oid_from_str(oid)
 35                  for oid in remote_device_conf['oids']},
 36            string_hex_oids={_oid_from_str(oid)
 37                             for oid in remote_device_conf['string_hex_oids']})
 38
 39        if remote_device_conf['version'] == 'V1':
 40            subkey = remote_device_conf['community']
 41
 42        elif remote_device_conf['version'] == 'V2C':
 43            subkey = remote_device_conf['community']
 44
 45        elif remote_device_conf['version'] == 'V3':
 46            subkey = (
 47                snmp.Context(
 48                    engine_id=bytes.fromhex(
 49                        remote_device_conf['context']['engine_id']),
 50                    name=remote_device_conf['context']['name'])
 51                if remote_device_conf['context'] else None)
 52
 53        else:
 54            raise Exception('invalid version')
 55
 56        device._remote_devices[(version, subkey)].append(remote_device)
 57
 58    users = [
 59        snmp.User(
 60            name=user_conf['name'],
 61            auth_type=(snmp.AuthType[user_conf['authentication']['type']]
 62                       if user_conf['authentication'] else None),
 63            auth_password=(user_conf['authentication']['password']
 64                           if user_conf['authentication'] else None),
 65            priv_type=(snmp.PrivType[user_conf['privacy']['type']]
 66                       if user_conf['privacy'] else None),
 67            priv_password=(user_conf['privacy']['password']
 68                           if user_conf['privacy'] else None))
 69        for user_conf in conf['users']]
 70
 71    device._listener = await snmp.create_trap_listener(
 72        local_addr=udp.Address(host=conf['local_host'],
 73                               port=conf['local_port']),
 74        v1_trap_cb=device._on_v1_trap,
 75        v2c_trap_cb=device._on_v2c_trap,
 76        v2c_inform_cb=device._on_v2c_inform,
 77        v3_trap_cb=device._on_v3_trap,
 78        v3_inform_cb=device._on_v3_inform,
 79        users=users)
 80
 81    return device
 82
 83
 84info = common.DeviceInfo(
 85    type='snmp_trap_listener',
 86    create=create,
 87    json_schema_id="hat-gateway://snmp.yaml#/$defs/trap_listener",
 88    json_schema_repo=common.json_schema_repo)
 89
 90
 91class SnmpTrapListenerDevice(common.Device):
 92
 93    @property
 94    def async_group(self) -> aio.Group:
 95        return self._listener.async_group
 96
 97    async def process_events(self, events: Collection[hat.event.common.Event]):
 98        pass
 99
100    async def _on_v1_trap(self, addr, community, trap):
101        await self._process_data(version=snmp.Version.V1,
102                                 subkey=community,
103                                 data=trap.data)
104
105    async def _on_v2c_trap(self, addr, community, trap):
106        await self._process_data(version=snmp.Version.V2C,
107                                 subkey=community,
108                                 data=trap.data)
109
110    async def _on_v2c_inform(self, addr, community, inform):
111        await self._process_data(version=snmp.Version.V2C,
112                                 subkey=community,
113                                 data=inform.data)
114
115    async def _on_v3_trap(self, addr, user, context, trap):
116        await self._process_data(version=snmp.Version.V3,
117                                 subkey=context,
118                                 data=trap.data)
119
120    async def _on_v3_inform(self, addr, user, context, inform):
121        await self._process_data(version=snmp.Version.V3,
122                                 subkey=context,
123                                 data=inform.data)
124
125    async def _process_data(self, version, subkey, data):
126        try:
127            events = collections.deque()
128
129            for key in [(version, subkey),
130                        (version, None)]:
131                for remote_device in self._remote_devices.get(key, []):
132                    for i in data:
133                        if i.name not in remote_device.oids:
134                            continue
135
136                        event = hat.event.common.RegisterEvent(
137                            type=(*self._event_type_prefix, 'gateway', 'data',
138                                  remote_device.name, _oid_to_str(i.name)),
139                            source_timestamp=None,
140                            payload=hat.event.common.EventPayloadJson(
141                                _event_payload_from_data(
142                                    i, remote_device.string_hex_oids)))
143                        events.append(event)
144
145            if not events:
146                return
147
148            await self._eventer_client.register(events)
149
150        except ConnectionError:
151            pass
152
153        except Exception as e:
154            mlog.error("error processing data: %s", e, exc_info=e)
155
156
157class _RemoteDevice(typing.NamedTuple):
158    name: str
159    oids: set[asn1.ObjectIdentifier]
160    string_hex_oids: set[asn1.ObjectIdentifier]
161
162
163def _oid_from_str(oid_str):
164    return tuple(int(i) for i in oid_str.split('.'))
165
166
167def _oid_to_str(oid):
168    return '.'.join(str(i) for i in oid)
169
170
171def _event_payload_from_data(data, string_hex_oids):
172    if isinstance(data, snmp.EmptyData):
173        return {'type': 'ERROR',
174                'value': 'EMPTY'}
175
176    if isinstance(data, snmp.UnspecifiedData):
177        return {'type': 'ERROR',
178                'value': 'UNSPECIFIED'}
179
180    if isinstance(data, snmp.NoSuchObjectData):
181        return {'type': 'ERROR',
182                'value': 'NO_SUCH_OBJECT'}
183
184    if isinstance(data, snmp.NoSuchInstanceData):
185        return {'type': 'ERROR',
186                'value': 'NO_SUCH_INSTANCE'}
187
188    if isinstance(data, snmp.EndOfMibViewData):
189        return {'type': 'ERROR',
190                'value': 'END_OF_MIB_VIEW'}
191
192    if isinstance(data, snmp.IntegerData):
193        return {'type': 'INTEGER',
194                'value': data.value}
195
196    if isinstance(data, snmp.UnsignedData):
197        return {'type': 'UNSIGNED',
198                'value': data.value}
199
200    if isinstance(data, snmp.CounterData):
201        return {'type': 'COUNTER',
202                'value': data.value}
203
204    if isinstance(data, snmp.BigCounterData):
205        return {'type': 'BIG_COUNTER',
206                'value': data.value}
207
208    if isinstance(data, snmp.TimeTicksData):
209        return {'type': 'TIME_TICKS',
210                'value': data.value}
211
212    if isinstance(data, snmp.StringData):
213        if data.name in string_hex_oids:
214            return {'type': 'STRING_HEX',
215                    'value': data.value.hex()}
216
217        return {'type': 'STRING',
218                'value': str(data.value, encoding='utf-8', errors='replace')}
219
220    if isinstance(data, snmp.ObjectIdData):
221        return {'type': 'OBJECT_ID',
222                'value': _oid_to_str(data.value)}
223
224    if isinstance(data, snmp.IpAddressData):
225        return {'type': 'IP_ADDRESS',
226                'value': '.'.join(str(i) for i in data.value)}
227
228    if isinstance(data, snmp.ArbitraryData):
229        return {'type': 'ARBITRARY',
230                'value': data.value.hex()}
231
232    raise Exception('invalid response data')
mlog: logging.Logger = <Logger hat.gateway.devices.snmp.trap_listener (WARNING)>
async def create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> SnmpTrapListenerDevice:
22async def create(conf: common.DeviceConf,
23                 eventer_client: hat.event.eventer.Client,
24                 event_type_prefix: common.EventTypePrefix
25                 ) -> 'SnmpTrapListenerDevice':
26    device = SnmpTrapListenerDevice()
27    device._eventer_client = eventer_client
28    device._event_type_prefix = event_type_prefix
29    device._remote_devices = collections.defaultdict(collections.deque)
30
31    for remote_device_conf in conf['remote_devices']:
32        version = snmp.Version[remote_device_conf['version']]
33        remote_device = _RemoteDevice(
34            name=remote_device_conf['name'],
35            oids={_oid_from_str(oid)
36                  for oid in remote_device_conf['oids']},
37            string_hex_oids={_oid_from_str(oid)
38                             for oid in remote_device_conf['string_hex_oids']})
39
40        if remote_device_conf['version'] == 'V1':
41            subkey = remote_device_conf['community']
42
43        elif remote_device_conf['version'] == 'V2C':
44            subkey = remote_device_conf['community']
45
46        elif remote_device_conf['version'] == 'V3':
47            subkey = (
48                snmp.Context(
49                    engine_id=bytes.fromhex(
50                        remote_device_conf['context']['engine_id']),
51                    name=remote_device_conf['context']['name'])
52                if remote_device_conf['context'] else None)
53
54        else:
55            raise Exception('invalid version')
56
57        device._remote_devices[(version, subkey)].append(remote_device)
58
59    users = [
60        snmp.User(
61            name=user_conf['name'],
62            auth_type=(snmp.AuthType[user_conf['authentication']['type']]
63                       if user_conf['authentication'] else None),
64            auth_password=(user_conf['authentication']['password']
65                           if user_conf['authentication'] else None),
66            priv_type=(snmp.PrivType[user_conf['privacy']['type']]
67                       if user_conf['privacy'] else None),
68            priv_password=(user_conf['privacy']['password']
69                           if user_conf['privacy'] else None))
70        for user_conf in conf['users']]
71
72    device._listener = await snmp.create_trap_listener(
73        local_addr=udp.Address(host=conf['local_host'],
74                               port=conf['local_port']),
75        v1_trap_cb=device._on_v1_trap,
76        v2c_trap_cb=device._on_v2c_trap,
77        v2c_inform_cb=device._on_v2c_inform,
78        v3_trap_cb=device._on_v3_trap,
79        v3_inform_cb=device._on_v3_inform,
80        users=users)
81
82    return device
info = DeviceInfo(type='snmp_trap_listener', create=<function create>, json_schema_id='hat-gateway://snmp.yaml#/$defs/trap_listener', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class SnmpTrapListenerDevice(hat.gateway.common.Device):
 92class SnmpTrapListenerDevice(common.Device):
 93
 94    @property
 95    def async_group(self) -> aio.Group:
 96        return self._listener.async_group
 97
 98    async def process_events(self, events: Collection[hat.event.common.Event]):
 99        pass
100
101    async def _on_v1_trap(self, addr, community, trap):
102        await self._process_data(version=snmp.Version.V1,
103                                 subkey=community,
104                                 data=trap.data)
105
106    async def _on_v2c_trap(self, addr, community, trap):
107        await self._process_data(version=snmp.Version.V2C,
108                                 subkey=community,
109                                 data=trap.data)
110
111    async def _on_v2c_inform(self, addr, community, inform):
112        await self._process_data(version=snmp.Version.V2C,
113                                 subkey=community,
114                                 data=inform.data)
115
116    async def _on_v3_trap(self, addr, user, context, trap):
117        await self._process_data(version=snmp.Version.V3,
118                                 subkey=context,
119                                 data=trap.data)
120
121    async def _on_v3_inform(self, addr, user, context, inform):
122        await self._process_data(version=snmp.Version.V3,
123                                 subkey=context,
124                                 data=inform.data)
125
126    async def _process_data(self, version, subkey, data):
127        try:
128            events = collections.deque()
129
130            for key in [(version, subkey),
131                        (version, None)]:
132                for remote_device in self._remote_devices.get(key, []):
133                    for i in data:
134                        if i.name not in remote_device.oids:
135                            continue
136
137                        event = hat.event.common.RegisterEvent(
138                            type=(*self._event_type_prefix, 'gateway', 'data',
139                                  remote_device.name, _oid_to_str(i.name)),
140                            source_timestamp=None,
141                            payload=hat.event.common.EventPayloadJson(
142                                _event_payload_from_data(
143                                    i, remote_device.string_hex_oids)))
144                        events.append(event)
145
146            if not events:
147                return
148
149            await self._eventer_client.register(events)
150
151        except ConnectionError:
152            pass
153
154        except Exception as e:
155            mlog.error("error processing data: %s", e, exc_info=e)

Device interface

async_group: hat.aio.group.Group
94    @property
95    def async_group(self) -> aio.Group:
96        return self._listener.async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
98    async def process_events(self, events: Collection[hat.event.common.Event]):
99        pass

Process received events

This method can be coroutine or regular function.