hat.gateway.devices.ping
Ping device
1"""Ping device""" 2 3from collections.abc import Collection 4import asyncio 5import contextlib 6import logging 7 8from hat import aio 9from hat.drivers import icmp 10import hat.event.common 11 12from hat.gateway import common 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16 17 18async def create(conf: common.DeviceConf, 19 eventer_client: hat.event.eventer.Client, 20 event_type_prefix: common.EventTypePrefix 21 ) -> 'PingDevice': 22 device = PingDevice() 23 device._eventer_client = eventer_client 24 device._event_type_prefix = event_type_prefix 25 device._devices_status = {} 26 27 device._endpoint = await icmp.create_endpoint() 28 29 for device_conf in conf['remote_devices']: 30 device.async_group.spawn(device._remote_device_loop, device_conf) 31 32 return device 33 34 35class PingDevice(common.Device): 36 37 @property 38 def async_group(self) -> aio.Group: 39 return self._endpoint._async_group 40 41 def process_events(self, events: Collection[hat.event.common.Event]): 42 pass 43 44 async def _remote_device_loop(self, device_conf): 45 name = device_conf['name'] 46 try: 47 await self._register_status("NOT_AVAILABLE", name) 48 while True: 49 try: 50 await self._ping_retry(device_conf) 51 status = "AVAILABLE" 52 mlog.debug('ping to %s successfull', device_conf['host']) 53 54 except Exception as e: 55 mlog.debug("device %s not available: %s", 56 device_conf['host'], e, exc_info=e) 57 status = "NOT_AVAILABLE" 58 59 await self._register_status(status, name) 60 await asyncio.sleep(device_conf['ping_delay']) 61 62 except ConnectionError: 63 pass 64 65 except Exception as e: 66 mlog.error("device %s loop error: %s", name, e, exc_info=e) 67 68 finally: 69 self.close() 70 with contextlib.suppress(ConnectionError): 71 await self._register_status("NOT_AVAILABLE", name) 72 73 async def _ping_retry(self, device_conf): 74 retry_count = device_conf['retry_count'] 75 for i in range(retry_count + 1): 76 try: 77 return await aio.wait_for( 78 self._endpoint.ping(device_conf['host']), 79 timeout=device_conf['ping_timeout']) 80 81 except Exception as e: 82 retry_msg = (f", retry {i}/{retry_count}" if i > 0 else "") 83 mlog.debug('no ping response%s: %s', retry_msg, e, exc_info=e) 84 85 await asyncio.sleep(device_conf['retry_delay']) 86 87 raise Exception(f"no ping response after {retry_count} retries") 88 89 async def _register_status(self, status, name): 90 old_status = self._devices_status.get(name) 91 if old_status == status: 92 return 93 mlog.debug('remote device %s status %s', name, status) 94 self._devices_status[name] = status 95 await self._eventer_client.register([hat.event.common.RegisterEvent( 96 type=(*self._event_type_prefix, 'gateway', 'status', name), 97 source_timestamp=None, 98 payload=hat.event.common.EventPayloadJson(status))]) 99 100 101info = common.DeviceInfo( 102 type='ping', 103 create=create, 104 json_schema_id="hat-gateway://ping.yaml#/$defs/device", 105 json_schema_repo=common.json_schema_repo)
async def
create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> PingDevice:
19async def create(conf: common.DeviceConf, 20 eventer_client: hat.event.eventer.Client, 21 event_type_prefix: common.EventTypePrefix 22 ) -> 'PingDevice': 23 device = PingDevice() 24 device._eventer_client = eventer_client 25 device._event_type_prefix = event_type_prefix 26 device._devices_status = {} 27 28 device._endpoint = await icmp.create_endpoint() 29 30 for device_conf in conf['remote_devices']: 31 device.async_group.spawn(device._remote_device_loop, device_conf) 32 33 return device
36class PingDevice(common.Device): 37 38 @property 39 def async_group(self) -> aio.Group: 40 return self._endpoint._async_group 41 42 def process_events(self, events: Collection[hat.event.common.Event]): 43 pass 44 45 async def _remote_device_loop(self, device_conf): 46 name = device_conf['name'] 47 try: 48 await self._register_status("NOT_AVAILABLE", name) 49 while True: 50 try: 51 await self._ping_retry(device_conf) 52 status = "AVAILABLE" 53 mlog.debug('ping to %s successfull', device_conf['host']) 54 55 except Exception as e: 56 mlog.debug("device %s not available: %s", 57 device_conf['host'], e, exc_info=e) 58 status = "NOT_AVAILABLE" 59 60 await self._register_status(status, name) 61 await asyncio.sleep(device_conf['ping_delay']) 62 63 except ConnectionError: 64 pass 65 66 except Exception as e: 67 mlog.error("device %s loop error: %s", name, e, exc_info=e) 68 69 finally: 70 self.close() 71 with contextlib.suppress(ConnectionError): 72 await self._register_status("NOT_AVAILABLE", name) 73 74 async def _ping_retry(self, device_conf): 75 retry_count = device_conf['retry_count'] 76 for i in range(retry_count + 1): 77 try: 78 return await aio.wait_for( 79 self._endpoint.ping(device_conf['host']), 80 timeout=device_conf['ping_timeout']) 81 82 except Exception as e: 83 retry_msg = (f", retry {i}/{retry_count}" if i > 0 else "") 84 mlog.debug('no ping response%s: %s', retry_msg, e, exc_info=e) 85 86 await asyncio.sleep(device_conf['retry_delay']) 87 88 raise Exception(f"no ping response after {retry_count} retries") 89 90 async def _register_status(self, status, name): 91 old_status = self._devices_status.get(name) 92 if old_status == status: 93 return 94 mlog.debug('remote device %s status %s', name, status) 95 self._devices_status[name] = status 96 await self._eventer_client.register([hat.event.common.RegisterEvent( 97 type=(*self._event_type_prefix, 'gateway', 'status', name), 98 source_timestamp=None, 99 payload=hat.event.common.EventPayloadJson(status))])
Device interface
info =
DeviceInfo(type='ping', create=<function create>, json_schema_id='hat-gateway://ping.yaml#/$defs/device', json_schema_repo=<hat.json.repository.SchemaRepository object>)