hat.gateway.devices.snmp.manager

SNMP manager device

  1"""SNMP manager device"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import logging
  6
  7from hat import aio
  8from hat import util
  9from hat.drivers import snmp
 10from hat.drivers import udp
 11import hat.event.common
 12import hat.event.eventer
 13
 14from hat.gateway import common
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18
 19
 20class SnmpManagerDevice(common.Device):
 21
 22    def __init__(self,
 23                 conf: common.DeviceConf,
 24                 eventer_client: hat.event.eventer.Client,
 25                 event_type_prefix: common.EventTypePrefix):
 26        self._conf = conf
 27        self._eventer_client = eventer_client
 28        self._event_type_prefix = event_type_prefix
 29        self._manager = None
 30        self._status = None
 31        self._cache = {}
 32        self._polling_oids = ([_oid_from_str(oid_str)
 33                               for oid_str in conf['polling_oids']]
 34                              if conf['polling_oids']
 35                              else [(0, 0)])
 36        self._string_hex_oids = set(_oid_from_str(oid_str)
 37                                    for oid_str in conf['string_hex_oids'])
 38        self._async_group = aio.Group()
 39
 40        self.async_group.spawn(self._connection_loop)
 41
 42    @property
 43    def async_group(self) -> aio.Group:
 44        return self._async_group
 45
 46    async def process_events(self, events: Collection[hat.event.common.Event]):
 47        for event in events:
 48            try:
 49                await self._process_event(event)
 50
 51            except Exception as e:
 52                mlog.warning('event processing error: %s', e, exc_info=e)
 53
 54    async def _connection_loop(self):
 55        try:
 56            while True:
 57                await self._register_status('CONNECTING')
 58                mlog.debug('connecting to %s:%s',
 59                           self._conf['remote_host'],
 60                           self._conf['remote_port'])
 61
 62                try:
 63                    try:
 64                        self._manager = await _create_manager(self._conf)
 65
 66                    except Exception as e:
 67                        mlog.warning('creating manager failed %s', e,
 68                                     exc_info=e)
 69
 70                    if self._manager:
 71                        mlog.debug('connected to %s:%s',
 72                                   self._conf['remote_host'],
 73                                   self._conf['remote_port'])
 74                        self._manager.async_group.spawn(self._polling_loop)
 75                        await self._manager.wait_closed()
 76
 77                finally:
 78                    connect_delay = (0 if self._status == 'CONNECTED'
 79                                     else self._conf['connect_delay'])
 80                    await self._register_status('DISCONNECTED')
 81
 82                self._manager = None
 83                self._cache = {}
 84                await asyncio.sleep(connect_delay)
 85
 86        except Exception as e:
 87            mlog.error('connection loop error: %s', e, exc_info=e)
 88
 89        finally:
 90            mlog.debug('closing device')
 91            self.close()
 92            if self._manager:
 93                await aio.uncancellable(self._manager.async_close())
 94
 95    async def _polling_loop(self):
 96        try:
 97            while True:
 98                for oid in self._polling_oids:
 99                    req = snmp.GetDataReq(names=[oid])
100                    resp = await self._send(req)
101
102                    try:
103                        _verify_read_response(resp=resp,
104                                              oid=oid)
105
106                    except Exception as e:
107                        mlog.warning('connection closing, error response: %s',
108                                     e, exc_info=e)
109                        return
110
111                    await self._register_status('CONNECTED')
112                    if (not self._conf['polling_oids'] or
113                            self._cache.get(oid) == resp):
114                        continue
115
116                    cause = ('CHANGE' if oid in self._cache
117                             else 'INTERROGATE')
118                    self._cache[oid] = resp
119                    mlog.debug('polling oid %s', oid)
120                    try:
121                        event = self._response_to_read_event(resp=resp,
122                                                             oid=oid,
123                                                             cause=cause,
124                                                             session_id=None)
125
126                    except Exception as e:
127                        mlog.warning('response %s ignored due to: %s',
128                                     resp, e, exc_info=e)
129                        continue
130
131                    await self._eventer_client.register([event])
132
133                await asyncio.sleep(self._conf['polling_delay'])
134
135        except Exception as e:
136            mlog.error('polling loop error: %s', e, exc_info=e)
137
138        finally:
139            mlog.debug('closing manager')
140            self._manager.close()
141
142    async def _process_event(self, event):
143        if self._manager is None or not self._manager.is_open:
144            raise Exception('connection not established')
145
146        etype_suffix = event.type[len(self._event_type_prefix):]
147
148        if etype_suffix[:2] == ('system', 'read'):
149            oid = _oid_from_str(etype_suffix[2])
150            await self._process_read_event(event=event,
151                                           oid=oid)
152
153        elif etype_suffix[:2] == ('system', 'write'):
154            oid = _oid_from_str(etype_suffix[2])
155            await self._process_write_event(event=event,
156                                            oid=oid)
157
158        else:
159            raise Exception('event type not supported')
160
161    async def _process_read_event(self, event, oid):
162        mlog.debug('read request for oid %s', oid)
163        req = snmp.GetDataReq(names=[oid])
164        try:
165            resp = await self._send(req)
166
167        except Exception:
168            self._manager.close()
169            raise
170
171        mlog.debug('read response for oid %s: %s', oid, resp)
172        session_id = event.payload.data['session_id']
173
174        try:
175            event = self._response_to_read_event(resp=resp,
176                                                 oid=oid,
177                                                 cause='REQUESTED',
178                                                 session_id=session_id)
179
180        except Exception as e:
181            mlog.warning('response ignored due to: %s', e, exc_info=e)
182            return
183
184        await self._eventer_client.register([event])
185
186    async def _process_write_event(self, event, oid):
187        set_data = _event_data_to_snmp_data(data=event.payload.data['data'],
188                                            oid=oid)
189        mlog.debug('write request for oid %s: %s', oid, set_data)
190        try:
191            resp = await asyncio.wait_for(
192                self._manager.send(snmp.SetDataReq(data=[set_data])),
193                timeout=self._conf['request_timeout'])
194
195        except asyncio.TimeoutError:
196            mlog.warning('set data request %s timeout', set_data)
197            return
198
199        session_id = event.payload.data['session_id']
200        success = _is_write_response_success(resp=resp,
201                                             oid=oid)
202        mlog.debug('write for oid %s %s',
203                   oid, ('succeeded' if success else 'failed'))
204        event = hat.event.common.RegisterEvent(
205            type=(*self._event_type_prefix, 'gateway', 'write',
206                  _oid_to_str(oid)),
207            source_timestamp=None,
208            payload=hat.event.common.EventPayloadJson({
209                'session_id': session_id,
210                'success': success}))
211        await self._eventer_client.register([event])
212
213    async def _send(self, req):
214        for i in range(self._conf['request_retry_count'] + 1):
215            try:
216                return await asyncio.wait_for(
217                    self._manager.send(req),
218                    timeout=self._conf['request_timeout'])
219
220            except asyncio.TimeoutError:
221                mlog.warning('request %s/%s timeout', i,
222                             self._conf['request_retry_count'])
223                await asyncio.sleep(self._conf['request_retry_delay'])
224
225        raise Exception('request retries exceeded')
226
227    async def _register_status(self, status):
228        if self._status == status:
229            return
230
231        event = hat.event.common.RegisterEvent(
232            type=(*self._event_type_prefix, 'gateway', 'status'),
233            source_timestamp=None,
234            payload=hat.event.common.EventPayloadJson(status))
235        await self._eventer_client.register([event])
236
237        mlog.debug("device status %s -> %s", self._status, status)
238        self._status = status
239
240    def _response_to_read_event(self, resp, oid, cause, session_id):
241        data = _event_data_from_response(resp=resp,
242                                         oid=oid,
243                                         string_hex_oids=self._string_hex_oids)
244        payload = {'session_id': session_id,
245                   'cause': cause,
246                   'data': data}
247        return hat.event.common.RegisterEvent(
248                type=(*self._event_type_prefix, 'gateway', 'read',
249                      _oid_to_str(oid)),
250                source_timestamp=None,
251                payload=hat.event.common.EventPayloadJson(payload))
252
253
254info = common.DeviceInfo(
255    type='snmp_manager',
256    create=SnmpManagerDevice,
257    json_schema_id="hat-gateway://snmp.yaml#/$defs/manager",
258    json_schema_repo=common.json_schema_repo)
259
260
261async def _create_manager(conf):
262    if conf['version'] == 'V1':
263        return await snmp.create_v1_manager(
264            remote_addr=udp.Address(
265                host=conf['remote_host'],
266                port=conf['remote_port']),
267            community=conf['community'])
268
269    if conf['version'] == 'V2C':
270        return await snmp.create_v2c_manager(
271            remote_addr=udp.Address(
272                host=conf['remote_host'],
273                port=conf['remote_port']),
274            community=conf['community'])
275
276    if conf['version'] == 'V3':
277        return await aio.wait_for(
278            snmp.create_v3_manager(
279                remote_addr=udp.Address(
280                    host=conf['remote_host'],
281                    port=conf['remote_port']),
282                context=snmp.Context(
283                    engine_id=bytes.fromhex(conf['context']['engine_id']),
284                    name=conf['context']['name']) if conf['context'] else None,
285                user=snmp.User(
286                    name=conf['user'],
287                    auth_type=(snmp.AuthType[conf['authentication']['type']]
288                               if conf['authentication'] else None),
289                    auth_password=(conf['authentication']['password']
290                                   if conf['authentication'] else None),
291                    priv_type=(snmp.PrivType[conf['privacy']['type']]
292                               if conf['privacy'] else None),
293                    priv_password=(conf['privacy']['password']
294                                   if conf['privacy'] else None))),
295            timeout=conf['request_timeout'])
296
297    raise Exception('unknown version')
298
299
300def _verify_read_response(resp, oid):
301    if isinstance(resp, snmp.Error):
302        return
303
304    resp_data = util.first(resp, lambda i: i.name == oid)
305    if resp_data:
306        return
307
308    if not resp:
309        return
310
311    resp_data = resp[0]
312    if resp_data.name[:10] in _conn_close_oids:
313        raise Exception('unsupported security levels')
314
315
316def _is_write_response_success(resp, oid):
317    if isinstance(resp, snmp.Error):
318        if resp.type == snmp.ErrorType.NO_ERROR:
319            return True
320
321        return False
322
323    if not resp:
324        return True
325
326    resp_data = util.first(resp, lambda i: i.name == oid)
327    if not resp_data:
328        return False
329
330    if isinstance(resp_data,
331                  (snmp.EmptyData,
332                   snmp.UnspecifiedData,
333                   snmp.NoSuchObjectData,
334                   snmp.NoSuchInstanceData,
335                   snmp.EndOfMibViewData)):
336        return False
337
338    return True
339
340
341def _event_data_from_response(resp, oid, string_hex_oids):
342    if isinstance(resp, snmp.Error):
343        if resp.type == snmp.ErrorType.NO_ERROR:
344            raise Exception('received unexpected error type NO_ERROR')
345
346        return {'type': 'ERROR',
347                'value': resp.type.name}
348
349    data = util.first(resp, lambda i: i.name == oid)
350    if data is None:
351        if resp and resp[0].name[:10] in _error_oids:
352            value = _error_oids[resp[0].name]
353
354        else:
355            value = 'GEN_ERR'
356
357        return {'type': 'ERROR',
358                'value': value}
359
360    if isinstance(data, snmp.EmptyData):
361        return {'type': 'ERROR',
362                'value': 'EMPTY'}
363
364    if isinstance(data, snmp.UnspecifiedData):
365        return {'type': 'ERROR',
366                'value': 'UNSPECIFIED'}
367
368    if isinstance(data, snmp.NoSuchObjectData):
369        return {'type': 'ERROR',
370                'value': 'NO_SUCH_OBJECT'}
371
372    if isinstance(data, snmp.NoSuchInstanceData):
373        return {'type': 'ERROR',
374                'value': 'NO_SUCH_INSTANCE'}
375
376    if isinstance(data, snmp.EndOfMibViewData):
377        return {'type': 'ERROR',
378                'value': 'END_OF_MIB_VIEW'}
379
380    if isinstance(data, snmp.IntegerData):
381        return {'type': 'INTEGER',
382                'value': data.value}
383
384    if isinstance(data, snmp.UnsignedData):
385        return {'type': 'UNSIGNED',
386                'value': data.value}
387
388    if isinstance(data, snmp.CounterData):
389        return {'type': 'COUNTER',
390                'value': data.value}
391
392    if isinstance(data, snmp.BigCounterData):
393        return {'type': 'BIG_COUNTER',
394                'value': data.value}
395
396    if isinstance(data, snmp.TimeTicksData):
397        return {'type': 'TIME_TICKS',
398                'value': data.value}
399
400    if isinstance(data, snmp.StringData):
401        if oid in string_hex_oids:
402            return {'type': 'STRING_HEX',
403                    'value': data.value.hex()}
404
405        return {'type': 'STRING',
406                'value': str(data.value, encoding='utf-8', errors='replace')}
407
408    if isinstance(data, snmp.ObjectIdData):
409        return {'type': 'OBJECT_ID',
410                'value': _oid_to_str(data.value)}
411
412    if isinstance(data, snmp.IpAddressData):
413        return {'type': 'IP_ADDRESS',
414                'value': '.'.join(str(i) for i in data.value)}
415
416    if isinstance(data, snmp.ArbitraryData):
417        return {'type': 'ARBITRARY',
418                'value': data.value.hex()}
419
420    raise Exception('invalid response data')
421
422
423def _event_data_to_snmp_data(data, oid):
424    data_type = data['type']
425    data_value = data['value']
426
427    if data_type == 'INTEGER':
428        return snmp.IntegerData(name=oid,
429                                value=data_value)
430
431    if data_type == 'UNSIGNED':
432        return snmp.UnsignedData(name=oid,
433                                 value=data_value)
434
435    if data_type == 'COUNTER':
436        return snmp.CounterData(name=oid,
437                                value=data_value)
438
439    if data_type == 'BIG_COUNTER':
440        return snmp.BigCounterData(name=oid,
441                                   value=data_value)
442
443    if data_type == 'STRING':
444        return snmp.StringData(name=oid,
445                               value=data_value.encode())
446
447    if data_type == 'STRING_HEX':
448        return snmp.StringData(name=oid,
449                               value=bytes.fromhex(data_value))
450
451    if data_type == 'OBJECT_ID':
452        return snmp.ObjectIdData(name=oid,
453                                 value=_oid_from_str(data_value))
454
455    if data_type == 'IP_ADDRESS':
456        return snmp.IpAddressData(
457            name=oid,
458            value=tuple(int(i) for i in data_value.split('.')))
459
460    if data_type == 'TIME_TICKS':
461        return snmp.TimeTicksData(name=oid,
462                                  value=data_value)
463
464    if data_type == 'ARBITRARY':
465        return snmp.ArbitraryData(name=oid,
466                                  value=bytes.fromhex(data_value))
467
468    raise Exception('invalid data type')
469
470
471def _oid_from_str(oid_str):
472    return tuple(int(i) for i in oid_str.split('.'))
473
474
475def _oid_to_str(oid):
476    return '.'.join(str(i) for i in oid)
477
478
479_error_oids = {
480    (1, 3, 6, 1, 6, 3, 15, 1, 1, 2): 'NOT_IN_TIME_WINDOWS',
481    (1, 3, 6, 1, 6, 3, 15, 1, 1, 3): 'UNKNOWN_USER_NAMES',
482    (1, 3, 6, 1, 6, 3, 15, 1, 1, 4): 'UNKNOWN_ENGINE_IDS',
483    (1, 3, 6, 1, 6, 3, 15, 1, 1, 5): 'WRONG_DIGESTS',
484    (1, 3, 6, 1, 6, 3, 15, 1, 1, 6): 'DECRYPTION_ERRORS'}
485
486_conn_close_oids = {(1, 3, 6, 1, 6, 3, 15, 1, 1, 1)}
mlog: logging.Logger = <Logger hat.gateway.devices.snmp.manager (WARNING)>
class SnmpManagerDevice(hat.gateway.common.Device):
 21class SnmpManagerDevice(common.Device):
 22
 23    def __init__(self,
 24                 conf: common.DeviceConf,
 25                 eventer_client: hat.event.eventer.Client,
 26                 event_type_prefix: common.EventTypePrefix):
 27        self._conf = conf
 28        self._eventer_client = eventer_client
 29        self._event_type_prefix = event_type_prefix
 30        self._manager = None
 31        self._status = None
 32        self._cache = {}
 33        self._polling_oids = ([_oid_from_str(oid_str)
 34                               for oid_str in conf['polling_oids']]
 35                              if conf['polling_oids']
 36                              else [(0, 0)])
 37        self._string_hex_oids = set(_oid_from_str(oid_str)
 38                                    for oid_str in conf['string_hex_oids'])
 39        self._async_group = aio.Group()
 40
 41        self.async_group.spawn(self._connection_loop)
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        return self._async_group
 46
 47    async def process_events(self, events: Collection[hat.event.common.Event]):
 48        for event in events:
 49            try:
 50                await self._process_event(event)
 51
 52            except Exception as e:
 53                mlog.warning('event processing error: %s', e, exc_info=e)
 54
 55    async def _connection_loop(self):
 56        try:
 57            while True:
 58                await self._register_status('CONNECTING')
 59                mlog.debug('connecting to %s:%s',
 60                           self._conf['remote_host'],
 61                           self._conf['remote_port'])
 62
 63                try:
 64                    try:
 65                        self._manager = await _create_manager(self._conf)
 66
 67                    except Exception as e:
 68                        mlog.warning('creating manager failed %s', e,
 69                                     exc_info=e)
 70
 71                    if self._manager:
 72                        mlog.debug('connected to %s:%s',
 73                                   self._conf['remote_host'],
 74                                   self._conf['remote_port'])
 75                        self._manager.async_group.spawn(self._polling_loop)
 76                        await self._manager.wait_closed()
 77
 78                finally:
 79                    connect_delay = (0 if self._status == 'CONNECTED'
 80                                     else self._conf['connect_delay'])
 81                    await self._register_status('DISCONNECTED')
 82
 83                self._manager = None
 84                self._cache = {}
 85                await asyncio.sleep(connect_delay)
 86
 87        except Exception as e:
 88            mlog.error('connection loop error: %s', e, exc_info=e)
 89
 90        finally:
 91            mlog.debug('closing device')
 92            self.close()
 93            if self._manager:
 94                await aio.uncancellable(self._manager.async_close())
 95
 96    async def _polling_loop(self):
 97        try:
 98            while True:
 99                for oid in self._polling_oids:
100                    req = snmp.GetDataReq(names=[oid])
101                    resp = await self._send(req)
102
103                    try:
104                        _verify_read_response(resp=resp,
105                                              oid=oid)
106
107                    except Exception as e:
108                        mlog.warning('connection closing, error response: %s',
109                                     e, exc_info=e)
110                        return
111
112                    await self._register_status('CONNECTED')
113                    if (not self._conf['polling_oids'] or
114                            self._cache.get(oid) == resp):
115                        continue
116
117                    cause = ('CHANGE' if oid in self._cache
118                             else 'INTERROGATE')
119                    self._cache[oid] = resp
120                    mlog.debug('polling oid %s', oid)
121                    try:
122                        event = self._response_to_read_event(resp=resp,
123                                                             oid=oid,
124                                                             cause=cause,
125                                                             session_id=None)
126
127                    except Exception as e:
128                        mlog.warning('response %s ignored due to: %s',
129                                     resp, e, exc_info=e)
130                        continue
131
132                    await self._eventer_client.register([event])
133
134                await asyncio.sleep(self._conf['polling_delay'])
135
136        except Exception as e:
137            mlog.error('polling loop error: %s', e, exc_info=e)
138
139        finally:
140            mlog.debug('closing manager')
141            self._manager.close()
142
143    async def _process_event(self, event):
144        if self._manager is None or not self._manager.is_open:
145            raise Exception('connection not established')
146
147        etype_suffix = event.type[len(self._event_type_prefix):]
148
149        if etype_suffix[:2] == ('system', 'read'):
150            oid = _oid_from_str(etype_suffix[2])
151            await self._process_read_event(event=event,
152                                           oid=oid)
153
154        elif etype_suffix[:2] == ('system', 'write'):
155            oid = _oid_from_str(etype_suffix[2])
156            await self._process_write_event(event=event,
157                                            oid=oid)
158
159        else:
160            raise Exception('event type not supported')
161
162    async def _process_read_event(self, event, oid):
163        mlog.debug('read request for oid %s', oid)
164        req = snmp.GetDataReq(names=[oid])
165        try:
166            resp = await self._send(req)
167
168        except Exception:
169            self._manager.close()
170            raise
171
172        mlog.debug('read response for oid %s: %s', oid, resp)
173        session_id = event.payload.data['session_id']
174
175        try:
176            event = self._response_to_read_event(resp=resp,
177                                                 oid=oid,
178                                                 cause='REQUESTED',
179                                                 session_id=session_id)
180
181        except Exception as e:
182            mlog.warning('response ignored due to: %s', e, exc_info=e)
183            return
184
185        await self._eventer_client.register([event])
186
187    async def _process_write_event(self, event, oid):
188        set_data = _event_data_to_snmp_data(data=event.payload.data['data'],
189                                            oid=oid)
190        mlog.debug('write request for oid %s: %s', oid, set_data)
191        try:
192            resp = await asyncio.wait_for(
193                self._manager.send(snmp.SetDataReq(data=[set_data])),
194                timeout=self._conf['request_timeout'])
195
196        except asyncio.TimeoutError:
197            mlog.warning('set data request %s timeout', set_data)
198            return
199
200        session_id = event.payload.data['session_id']
201        success = _is_write_response_success(resp=resp,
202                                             oid=oid)
203        mlog.debug('write for oid %s %s',
204                   oid, ('succeeded' if success else 'failed'))
205        event = hat.event.common.RegisterEvent(
206            type=(*self._event_type_prefix, 'gateway', 'write',
207                  _oid_to_str(oid)),
208            source_timestamp=None,
209            payload=hat.event.common.EventPayloadJson({
210                'session_id': session_id,
211                'success': success}))
212        await self._eventer_client.register([event])
213
214    async def _send(self, req):
215        for i in range(self._conf['request_retry_count'] + 1):
216            try:
217                return await asyncio.wait_for(
218                    self._manager.send(req),
219                    timeout=self._conf['request_timeout'])
220
221            except asyncio.TimeoutError:
222                mlog.warning('request %s/%s timeout', i,
223                             self._conf['request_retry_count'])
224                await asyncio.sleep(self._conf['request_retry_delay'])
225
226        raise Exception('request retries exceeded')
227
228    async def _register_status(self, status):
229        if self._status == status:
230            return
231
232        event = hat.event.common.RegisterEvent(
233            type=(*self._event_type_prefix, 'gateway', 'status'),
234            source_timestamp=None,
235            payload=hat.event.common.EventPayloadJson(status))
236        await self._eventer_client.register([event])
237
238        mlog.debug("device status %s -> %s", self._status, status)
239        self._status = status
240
241    def _response_to_read_event(self, resp, oid, cause, session_id):
242        data = _event_data_from_response(resp=resp,
243                                         oid=oid,
244                                         string_hex_oids=self._string_hex_oids)
245        payload = {'session_id': session_id,
246                   'cause': cause,
247                   'data': data}
248        return hat.event.common.RegisterEvent(
249                type=(*self._event_type_prefix, 'gateway', 'read',
250                      _oid_to_str(oid)),
251                source_timestamp=None,
252                payload=hat.event.common.EventPayloadJson(payload))

Device interface

SnmpManagerDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
23    def __init__(self,
24                 conf: common.DeviceConf,
25                 eventer_client: hat.event.eventer.Client,
26                 event_type_prefix: common.EventTypePrefix):
27        self._conf = conf
28        self._eventer_client = eventer_client
29        self._event_type_prefix = event_type_prefix
30        self._manager = None
31        self._status = None
32        self._cache = {}
33        self._polling_oids = ([_oid_from_str(oid_str)
34                               for oid_str in conf['polling_oids']]
35                              if conf['polling_oids']
36                              else [(0, 0)])
37        self._string_hex_oids = set(_oid_from_str(oid_str)
38                                    for oid_str in conf['string_hex_oids'])
39        self._async_group = aio.Group()
40
41        self.async_group.spawn(self._connection_loop)
async_group: hat.aio.group.Group
43    @property
44    def async_group(self) -> aio.Group:
45        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
47    async def process_events(self, events: Collection[hat.event.common.Event]):
48        for event in events:
49            try:
50                await self._process_event(event)
51
52            except Exception as e:
53                mlog.warning('event processing error: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.

info = DeviceInfo(type='snmp_manager', create=<class 'SnmpManagerDevice'>, json_schema_id='hat-gateway://snmp.yaml#/$defs/manager', json_schema_repo=<hat.json.repository.SchemaRepository object>)