hat.gateway.devices.snmp.manager
SNMP manager device
1"""SNMP manager device""" 2 3from collections.abc import Collection 4import asyncio 5import logging 6 7from hat import aio 8from hat import util 9from hat.drivers import snmp 10from hat.drivers import udp 11import hat.event.common 12import hat.event.eventer 13 14from hat.gateway import common 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18 19 20class SnmpManagerDevice(common.Device): 21 22 def __init__(self, 23 conf: common.DeviceConf, 24 eventer_client: hat.event.eventer.Client, 25 event_type_prefix: common.EventTypePrefix): 26 self._conf = conf 27 self._eventer_client = eventer_client 28 self._event_type_prefix = event_type_prefix 29 self._manager = None 30 self._status = None 31 self._cache = {} 32 self._polling_oids = ([_oid_from_str(oid_str) 33 for oid_str in conf['polling_oids']] 34 if conf['polling_oids'] 35 else [(0, 0)]) 36 self._string_hex_oids = set(_oid_from_str(oid_str) 37 for oid_str in conf['string_hex_oids']) 38 self._async_group = aio.Group() 39 40 self.async_group.spawn(self._connection_loop) 41 42 @property 43 def async_group(self) -> aio.Group: 44 return self._async_group 45 46 async def process_events(self, events: Collection[hat.event.common.Event]): 47 for event in events: 48 try: 49 await self._process_event(event) 50 51 except Exception as e: 52 mlog.warning('event processing error: %s', e, exc_info=e) 53 54 async def _connection_loop(self): 55 try: 56 while True: 57 await self._register_status('CONNECTING') 58 mlog.debug('connecting to %s:%s', 59 self._conf['remote_host'], 60 self._conf['remote_port']) 61 62 try: 63 try: 64 self._manager = await _create_manager(self._conf) 65 66 except Exception as e: 67 mlog.warning('creating manager failed %s', e, 68 exc_info=e) 69 70 if self._manager: 71 mlog.debug('connected to %s:%s', 72 self._conf['remote_host'], 73 self._conf['remote_port']) 74 self._manager.async_group.spawn(self._polling_loop) 75 await self._manager.wait_closed() 76 77 finally: 78 connect_delay = (0 if self._status == 'CONNECTED' 79 else self._conf['connect_delay']) 80 await self._register_status('DISCONNECTED') 81 82 self._manager = None 83 self._cache = {} 84 await asyncio.sleep(connect_delay) 85 86 except Exception as e: 87 mlog.error('connection loop error: %s', e, exc_info=e) 88 89 finally: 90 mlog.debug('closing device') 91 self.close() 92 if self._manager: 93 await aio.uncancellable(self._manager.async_close()) 94 95 async def _polling_loop(self): 96 try: 97 while True: 98 for oid in self._polling_oids: 99 req = snmp.GetDataReq(names=[oid]) 100 resp = await self._send(req) 101 102 try: 103 _verify_read_response(resp=resp, 104 oid=oid) 105 106 except Exception as e: 107 mlog.warning('connection closing, error response: %s', 108 e, exc_info=e) 109 return 110 111 await self._register_status('CONNECTED') 112 if (not self._conf['polling_oids'] or 113 self._cache.get(oid) == resp): 114 continue 115 116 cause = ('CHANGE' if oid in self._cache 117 else 'INTERROGATE') 118 self._cache[oid] = resp 119 mlog.debug('polling oid %s', oid) 120 try: 121 event = self._response_to_read_event(resp=resp, 122 oid=oid, 123 cause=cause, 124 session_id=None) 125 126 except Exception as e: 127 mlog.warning('response %s ignored due to: %s', 128 resp, e, exc_info=e) 129 continue 130 131 await self._eventer_client.register([event]) 132 133 await asyncio.sleep(self._conf['polling_delay']) 134 135 except Exception as e: 136 mlog.error('polling loop error: %s', e, exc_info=e) 137 138 finally: 139 mlog.debug('closing manager') 140 self._manager.close() 141 142 async def _process_event(self, event): 143 if self._manager is None or not self._manager.is_open: 144 raise Exception('connection not established') 145 146 etype_suffix = event.type[len(self._event_type_prefix):] 147 148 if etype_suffix[:2] == ('system', 'read'): 149 oid = _oid_from_str(etype_suffix[2]) 150 await self._process_read_event(event=event, 151 oid=oid) 152 153 elif etype_suffix[:2] == ('system', 'write'): 154 oid = _oid_from_str(etype_suffix[2]) 155 await self._process_write_event(event=event, 156 oid=oid) 157 158 else: 159 raise Exception('event type not supported') 160 161 async def _process_read_event(self, event, oid): 162 mlog.debug('read request for oid %s', oid) 163 req = snmp.GetDataReq(names=[oid]) 164 try: 165 resp = await self._send(req) 166 167 except Exception: 168 self._manager.close() 169 raise 170 171 mlog.debug('read response for oid %s: %s', oid, resp) 172 session_id = event.payload.data['session_id'] 173 174 try: 175 event = self._response_to_read_event(resp=resp, 176 oid=oid, 177 cause='REQUESTED', 178 session_id=session_id) 179 180 except Exception as e: 181 mlog.warning('response ignored due to: %s', e, exc_info=e) 182 return 183 184 await self._eventer_client.register([event]) 185 186 async def _process_write_event(self, event, oid): 187 set_data = _event_data_to_snmp_data(data=event.payload.data['data'], 188 oid=oid) 189 mlog.debug('write request for oid %s: %s', oid, set_data) 190 try: 191 resp = await asyncio.wait_for( 192 self._manager.send(snmp.SetDataReq(data=[set_data])), 193 timeout=self._conf['request_timeout']) 194 195 except asyncio.TimeoutError: 196 mlog.warning('set data request %s timeout', set_data) 197 return 198 199 session_id = event.payload.data['session_id'] 200 success = _is_write_response_success(resp=resp, 201 oid=oid) 202 mlog.debug('write for oid %s %s', 203 oid, ('succeeded' if success else 'failed')) 204 event = hat.event.common.RegisterEvent( 205 type=(*self._event_type_prefix, 'gateway', 'write', 206 _oid_to_str(oid)), 207 source_timestamp=None, 208 payload=hat.event.common.EventPayloadJson({ 209 'session_id': session_id, 210 'success': success})) 211 await self._eventer_client.register([event]) 212 213 async def _send(self, req): 214 for i in range(self._conf['request_retry_count'] + 1): 215 try: 216 return await asyncio.wait_for( 217 self._manager.send(req), 218 timeout=self._conf['request_timeout']) 219 220 except asyncio.TimeoutError: 221 mlog.warning('request %s/%s timeout', i, 222 self._conf['request_retry_count']) 223 await asyncio.sleep(self._conf['request_retry_delay']) 224 225 raise Exception('request retries exceeded') 226 227 async def _register_status(self, status): 228 if self._status == status: 229 return 230 231 event = hat.event.common.RegisterEvent( 232 type=(*self._event_type_prefix, 'gateway', 'status'), 233 source_timestamp=None, 234 payload=hat.event.common.EventPayloadJson(status)) 235 await self._eventer_client.register([event]) 236 237 mlog.debug("device status %s -> %s", self._status, status) 238 self._status = status 239 240 def _response_to_read_event(self, resp, oid, cause, session_id): 241 data = _event_data_from_response(resp=resp, 242 oid=oid, 243 string_hex_oids=self._string_hex_oids) 244 payload = {'session_id': session_id, 245 'cause': cause, 246 'data': data} 247 return hat.event.common.RegisterEvent( 248 type=(*self._event_type_prefix, 'gateway', 'read', 249 _oid_to_str(oid)), 250 source_timestamp=None, 251 payload=hat.event.common.EventPayloadJson(payload)) 252 253 254info = common.DeviceInfo( 255 type='snmp_manager', 256 create=SnmpManagerDevice, 257 json_schema_id="hat-gateway://snmp.yaml#/$defs/manager", 258 json_schema_repo=common.json_schema_repo) 259 260 261async def _create_manager(conf): 262 if conf['version'] == 'V1': 263 return await snmp.create_v1_manager( 264 remote_addr=udp.Address( 265 host=conf['remote_host'], 266 port=conf['remote_port']), 267 community=conf['community']) 268 269 if conf['version'] == 'V2C': 270 return await snmp.create_v2c_manager( 271 remote_addr=udp.Address( 272 host=conf['remote_host'], 273 port=conf['remote_port']), 274 community=conf['community']) 275 276 if conf['version'] == 'V3': 277 return await aio.wait_for( 278 snmp.create_v3_manager( 279 remote_addr=udp.Address( 280 host=conf['remote_host'], 281 port=conf['remote_port']), 282 context=snmp.Context( 283 engine_id=bytes.fromhex(conf['context']['engine_id']), 284 name=conf['context']['name']) if conf['context'] else None, 285 user=snmp.User( 286 name=conf['user'], 287 auth_type=(snmp.AuthType[conf['authentication']['type']] 288 if conf['authentication'] else None), 289 auth_password=(conf['authentication']['password'] 290 if conf['authentication'] else None), 291 priv_type=(snmp.PrivType[conf['privacy']['type']] 292 if conf['privacy'] else None), 293 priv_password=(conf['privacy']['password'] 294 if conf['privacy'] else None))), 295 timeout=conf['request_timeout']) 296 297 raise Exception('unknown version') 298 299 300def _verify_read_response(resp, oid): 301 if isinstance(resp, snmp.Error): 302 return 303 304 resp_data = util.first(resp, lambda i: i.name == oid) 305 if resp_data: 306 return 307 308 if not resp: 309 return 310 311 resp_data = resp[0] 312 if resp_data.name[:10] in _conn_close_oids: 313 raise Exception('unsupported security levels') 314 315 316def _is_write_response_success(resp, oid): 317 if isinstance(resp, snmp.Error): 318 if resp.type == snmp.ErrorType.NO_ERROR: 319 return True 320 321 return False 322 323 if not resp: 324 return True 325 326 resp_data = util.first(resp, lambda i: i.name == oid) 327 if not resp_data: 328 return False 329 330 if isinstance(resp_data, 331 (snmp.EmptyData, 332 snmp.UnspecifiedData, 333 snmp.NoSuchObjectData, 334 snmp.NoSuchInstanceData, 335 snmp.EndOfMibViewData)): 336 return False 337 338 return True 339 340 341def _event_data_from_response(resp, oid, string_hex_oids): 342 if isinstance(resp, snmp.Error): 343 if resp.type == snmp.ErrorType.NO_ERROR: 344 raise Exception('received unexpected error type NO_ERROR') 345 346 return {'type': 'ERROR', 347 'value': resp.type.name} 348 349 data = util.first(resp, lambda i: i.name == oid) 350 if data is None: 351 if resp and resp[0].name[:10] in _error_oids: 352 value = _error_oids[resp[0].name] 353 354 else: 355 value = 'GEN_ERR' 356 357 return {'type': 'ERROR', 358 'value': value} 359 360 if isinstance(data, snmp.EmptyData): 361 return {'type': 'ERROR', 362 'value': 'EMPTY'} 363 364 if isinstance(data, snmp.UnspecifiedData): 365 return {'type': 'ERROR', 366 'value': 'UNSPECIFIED'} 367 368 if isinstance(data, snmp.NoSuchObjectData): 369 return {'type': 'ERROR', 370 'value': 'NO_SUCH_OBJECT'} 371 372 if isinstance(data, snmp.NoSuchInstanceData): 373 return {'type': 'ERROR', 374 'value': 'NO_SUCH_INSTANCE'} 375 376 if isinstance(data, snmp.EndOfMibViewData): 377 return {'type': 'ERROR', 378 'value': 'END_OF_MIB_VIEW'} 379 380 if isinstance(data, snmp.IntegerData): 381 return {'type': 'INTEGER', 382 'value': data.value} 383 384 if isinstance(data, snmp.UnsignedData): 385 return {'type': 'UNSIGNED', 386 'value': data.value} 387 388 if isinstance(data, snmp.CounterData): 389 return {'type': 'COUNTER', 390 'value': data.value} 391 392 if isinstance(data, snmp.BigCounterData): 393 return {'type': 'BIG_COUNTER', 394 'value': data.value} 395 396 if isinstance(data, snmp.TimeTicksData): 397 return {'type': 'TIME_TICKS', 398 'value': data.value} 399 400 if isinstance(data, snmp.StringData): 401 if oid in string_hex_oids: 402 return {'type': 'STRING_HEX', 403 'value': data.value.hex()} 404 405 return {'type': 'STRING', 406 'value': str(data.value, encoding='utf-8', errors='replace')} 407 408 if isinstance(data, snmp.ObjectIdData): 409 return {'type': 'OBJECT_ID', 410 'value': _oid_to_str(data.value)} 411 412 if isinstance(data, snmp.IpAddressData): 413 return {'type': 'IP_ADDRESS', 414 'value': '.'.join(str(i) for i in data.value)} 415 416 if isinstance(data, snmp.ArbitraryData): 417 return {'type': 'ARBITRARY', 418 'value': data.value.hex()} 419 420 raise Exception('invalid response data') 421 422 423def _event_data_to_snmp_data(data, oid): 424 data_type = data['type'] 425 data_value = data['value'] 426 427 if data_type == 'INTEGER': 428 return snmp.IntegerData(name=oid, 429 value=data_value) 430 431 if data_type == 'UNSIGNED': 432 return snmp.UnsignedData(name=oid, 433 value=data_value) 434 435 if data_type == 'COUNTER': 436 return snmp.CounterData(name=oid, 437 value=data_value) 438 439 if data_type == 'BIG_COUNTER': 440 return snmp.BigCounterData(name=oid, 441 value=data_value) 442 443 if data_type == 'STRING': 444 return snmp.StringData(name=oid, 445 value=data_value.encode()) 446 447 if data_type == 'STRING_HEX': 448 return snmp.StringData(name=oid, 449 value=bytes.fromhex(data_value)) 450 451 if data_type == 'OBJECT_ID': 452 return snmp.ObjectIdData(name=oid, 453 value=_oid_from_str(data_value)) 454 455 if data_type == 'IP_ADDRESS': 456 return snmp.IpAddressData( 457 name=oid, 458 value=tuple(int(i) for i in data_value.split('.'))) 459 460 if data_type == 'TIME_TICKS': 461 return snmp.TimeTicksData(name=oid, 462 value=data_value) 463 464 if data_type == 'ARBITRARY': 465 return snmp.ArbitraryData(name=oid, 466 value=bytes.fromhex(data_value)) 467 468 raise Exception('invalid data type') 469 470 471def _oid_from_str(oid_str): 472 return tuple(int(i) for i in oid_str.split('.')) 473 474 475def _oid_to_str(oid): 476 return '.'.join(str(i) for i in oid) 477 478 479_error_oids = { 480 (1, 3, 6, 1, 6, 3, 15, 1, 1, 2): 'NOT_IN_TIME_WINDOWS', 481 (1, 3, 6, 1, 6, 3, 15, 1, 1, 3): 'UNKNOWN_USER_NAMES', 482 (1, 3, 6, 1, 6, 3, 15, 1, 1, 4): 'UNKNOWN_ENGINE_IDS', 483 (1, 3, 6, 1, 6, 3, 15, 1, 1, 5): 'WRONG_DIGESTS', 484 (1, 3, 6, 1, 6, 3, 15, 1, 1, 6): 'DECRYPTION_ERRORS'} 485 486_conn_close_oids = {(1, 3, 6, 1, 6, 3, 15, 1, 1, 1)}
21class SnmpManagerDevice(common.Device): 22 23 def __init__(self, 24 conf: common.DeviceConf, 25 eventer_client: hat.event.eventer.Client, 26 event_type_prefix: common.EventTypePrefix): 27 self._conf = conf 28 self._eventer_client = eventer_client 29 self._event_type_prefix = event_type_prefix 30 self._manager = None 31 self._status = None 32 self._cache = {} 33 self._polling_oids = ([_oid_from_str(oid_str) 34 for oid_str in conf['polling_oids']] 35 if conf['polling_oids'] 36 else [(0, 0)]) 37 self._string_hex_oids = set(_oid_from_str(oid_str) 38 for oid_str in conf['string_hex_oids']) 39 self._async_group = aio.Group() 40 41 self.async_group.spawn(self._connection_loop) 42 43 @property 44 def async_group(self) -> aio.Group: 45 return self._async_group 46 47 async def process_events(self, events: Collection[hat.event.common.Event]): 48 for event in events: 49 try: 50 await self._process_event(event) 51 52 except Exception as e: 53 mlog.warning('event processing error: %s', e, exc_info=e) 54 55 async def _connection_loop(self): 56 try: 57 while True: 58 await self._register_status('CONNECTING') 59 mlog.debug('connecting to %s:%s', 60 self._conf['remote_host'], 61 self._conf['remote_port']) 62 63 try: 64 try: 65 self._manager = await _create_manager(self._conf) 66 67 except Exception as e: 68 mlog.warning('creating manager failed %s', e, 69 exc_info=e) 70 71 if self._manager: 72 mlog.debug('connected to %s:%s', 73 self._conf['remote_host'], 74 self._conf['remote_port']) 75 self._manager.async_group.spawn(self._polling_loop) 76 await self._manager.wait_closed() 77 78 finally: 79 connect_delay = (0 if self._status == 'CONNECTED' 80 else self._conf['connect_delay']) 81 await self._register_status('DISCONNECTED') 82 83 self._manager = None 84 self._cache = {} 85 await asyncio.sleep(connect_delay) 86 87 except Exception as e: 88 mlog.error('connection loop error: %s', e, exc_info=e) 89 90 finally: 91 mlog.debug('closing device') 92 self.close() 93 if self._manager: 94 await aio.uncancellable(self._manager.async_close()) 95 96 async def _polling_loop(self): 97 try: 98 while True: 99 for oid in self._polling_oids: 100 req = snmp.GetDataReq(names=[oid]) 101 resp = await self._send(req) 102 103 try: 104 _verify_read_response(resp=resp, 105 oid=oid) 106 107 except Exception as e: 108 mlog.warning('connection closing, error response: %s', 109 e, exc_info=e) 110 return 111 112 await self._register_status('CONNECTED') 113 if (not self._conf['polling_oids'] or 114 self._cache.get(oid) == resp): 115 continue 116 117 cause = ('CHANGE' if oid in self._cache 118 else 'INTERROGATE') 119 self._cache[oid] = resp 120 mlog.debug('polling oid %s', oid) 121 try: 122 event = self._response_to_read_event(resp=resp, 123 oid=oid, 124 cause=cause, 125 session_id=None) 126 127 except Exception as e: 128 mlog.warning('response %s ignored due to: %s', 129 resp, e, exc_info=e) 130 continue 131 132 await self._eventer_client.register([event]) 133 134 await asyncio.sleep(self._conf['polling_delay']) 135 136 except Exception as e: 137 mlog.error('polling loop error: %s', e, exc_info=e) 138 139 finally: 140 mlog.debug('closing manager') 141 self._manager.close() 142 143 async def _process_event(self, event): 144 if self._manager is None or not self._manager.is_open: 145 raise Exception('connection not established') 146 147 etype_suffix = event.type[len(self._event_type_prefix):] 148 149 if etype_suffix[:2] == ('system', 'read'): 150 oid = _oid_from_str(etype_suffix[2]) 151 await self._process_read_event(event=event, 152 oid=oid) 153 154 elif etype_suffix[:2] == ('system', 'write'): 155 oid = _oid_from_str(etype_suffix[2]) 156 await self._process_write_event(event=event, 157 oid=oid) 158 159 else: 160 raise Exception('event type not supported') 161 162 async def _process_read_event(self, event, oid): 163 mlog.debug('read request for oid %s', oid) 164 req = snmp.GetDataReq(names=[oid]) 165 try: 166 resp = await self._send(req) 167 168 except Exception: 169 self._manager.close() 170 raise 171 172 mlog.debug('read response for oid %s: %s', oid, resp) 173 session_id = event.payload.data['session_id'] 174 175 try: 176 event = self._response_to_read_event(resp=resp, 177 oid=oid, 178 cause='REQUESTED', 179 session_id=session_id) 180 181 except Exception as e: 182 mlog.warning('response ignored due to: %s', e, exc_info=e) 183 return 184 185 await self._eventer_client.register([event]) 186 187 async def _process_write_event(self, event, oid): 188 set_data = _event_data_to_snmp_data(data=event.payload.data['data'], 189 oid=oid) 190 mlog.debug('write request for oid %s: %s', oid, set_data) 191 try: 192 resp = await asyncio.wait_for( 193 self._manager.send(snmp.SetDataReq(data=[set_data])), 194 timeout=self._conf['request_timeout']) 195 196 except asyncio.TimeoutError: 197 mlog.warning('set data request %s timeout', set_data) 198 return 199 200 session_id = event.payload.data['session_id'] 201 success = _is_write_response_success(resp=resp, 202 oid=oid) 203 mlog.debug('write for oid %s %s', 204 oid, ('succeeded' if success else 'failed')) 205 event = hat.event.common.RegisterEvent( 206 type=(*self._event_type_prefix, 'gateway', 'write', 207 _oid_to_str(oid)), 208 source_timestamp=None, 209 payload=hat.event.common.EventPayloadJson({ 210 'session_id': session_id, 211 'success': success})) 212 await self._eventer_client.register([event]) 213 214 async def _send(self, req): 215 for i in range(self._conf['request_retry_count'] + 1): 216 try: 217 return await asyncio.wait_for( 218 self._manager.send(req), 219 timeout=self._conf['request_timeout']) 220 221 except asyncio.TimeoutError: 222 mlog.warning('request %s/%s timeout', i, 223 self._conf['request_retry_count']) 224 await asyncio.sleep(self._conf['request_retry_delay']) 225 226 raise Exception('request retries exceeded') 227 228 async def _register_status(self, status): 229 if self._status == status: 230 return 231 232 event = hat.event.common.RegisterEvent( 233 type=(*self._event_type_prefix, 'gateway', 'status'), 234 source_timestamp=None, 235 payload=hat.event.common.EventPayloadJson(status)) 236 await self._eventer_client.register([event]) 237 238 mlog.debug("device status %s -> %s", self._status, status) 239 self._status = status 240 241 def _response_to_read_event(self, resp, oid, cause, session_id): 242 data = _event_data_from_response(resp=resp, 243 oid=oid, 244 string_hex_oids=self._string_hex_oids) 245 payload = {'session_id': session_id, 246 'cause': cause, 247 'data': data} 248 return hat.event.common.RegisterEvent( 249 type=(*self._event_type_prefix, 'gateway', 'read', 250 _oid_to_str(oid)), 251 source_timestamp=None, 252 payload=hat.event.common.EventPayloadJson(payload))
Device interface
SnmpManagerDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
23 def __init__(self, 24 conf: common.DeviceConf, 25 eventer_client: hat.event.eventer.Client, 26 event_type_prefix: common.EventTypePrefix): 27 self._conf = conf 28 self._eventer_client = eventer_client 29 self._event_type_prefix = event_type_prefix 30 self._manager = None 31 self._status = None 32 self._cache = {} 33 self._polling_oids = ([_oid_from_str(oid_str) 34 for oid_str in conf['polling_oids']] 35 if conf['polling_oids'] 36 else [(0, 0)]) 37 self._string_hex_oids = set(_oid_from_str(oid_str) 38 for oid_str in conf['string_hex_oids']) 39 self._async_group = aio.Group() 40 41 self.async_group.spawn(self._connection_loop)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
47 async def process_events(self, events: Collection[hat.event.common.Event]): 48 for event in events: 49 try: 50 await self._process_event(event) 51 52 except Exception as e: 53 mlog.warning('event processing error: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.
info =
DeviceInfo(type='snmp_manager', create=<class 'SnmpManagerDevice'>, json_schema_id='hat-gateway://snmp.yaml#/$defs/manager', json_schema_repo=<hat.json.repository.SchemaRepository object>)