hat.gateway.devices.ping

Ping device

  1"""Ping device"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import contextlib
  6import logging
  7
  8from hat import aio
  9from hat.drivers import icmp
 10import hat.event.common
 11
 12from hat.gateway import common
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16
 17
 18async def create(conf: common.DeviceConf,
 19                 eventer_client: hat.event.eventer.Client,
 20                 event_type_prefix: common.EventTypePrefix
 21                 ) -> 'PingDevice':
 22    device = PingDevice()
 23    device._eventer_client = eventer_client
 24    device._event_type_prefix = event_type_prefix
 25    device._devices_status = {}
 26
 27    device._endpoint = await icmp.create_endpoint()
 28
 29    for device_conf in conf['remote_devices']:
 30        device.async_group.spawn(device._remote_device_loop, device_conf)
 31
 32    return device
 33
 34
 35class PingDevice(common.Device):
 36
 37    @property
 38    def async_group(self) -> aio.Group:
 39        return self._endpoint._async_group
 40
 41    def process_events(self, events: Collection[hat.event.common.Event]):
 42        pass
 43
 44    async def _remote_device_loop(self, device_conf):
 45        name = device_conf['name']
 46        try:
 47            await self._register_status("NOT_AVAILABLE", name)
 48            while True:
 49                try:
 50                    await self._ping_retry(device_conf)
 51                    status = "AVAILABLE"
 52                    mlog.debug('ping to %s successfull', device_conf['host'])
 53
 54                except Exception as e:
 55                    mlog.debug("device %s not available: %s",
 56                               device_conf['host'], e, exc_info=e)
 57                    status = "NOT_AVAILABLE"
 58
 59                await self._register_status(status, name)
 60                await asyncio.sleep(device_conf['ping_delay'])
 61
 62        except ConnectionError:
 63            pass
 64
 65        except Exception as e:
 66            mlog.error("device %s loop error: %s", name, e, exc_info=e)
 67
 68        finally:
 69            self.close()
 70            with contextlib.suppress(ConnectionError):
 71                await self._register_status("NOT_AVAILABLE", name)
 72
 73    async def _ping_retry(self, device_conf):
 74        retry_count = device_conf['retry_count']
 75        for i in range(retry_count + 1):
 76            try:
 77                return await aio.wait_for(
 78                    self._endpoint.ping(device_conf['host']),
 79                    timeout=device_conf['ping_timeout'])
 80
 81            except Exception as e:
 82                retry_msg = (f", retry {i}/{retry_count}" if i > 0 else "")
 83                mlog.debug('no ping response%s: %s', retry_msg, e, exc_info=e)
 84
 85            await asyncio.sleep(device_conf['retry_delay'])
 86
 87        raise Exception(f"no ping response after {retry_count} retries")
 88
 89    async def _register_status(self, status, name):
 90        old_status = self._devices_status.get(name)
 91        if old_status == status:
 92            return
 93        mlog.debug('remote device %s status %s', name, status)
 94        self._devices_status[name] = status
 95        await self._eventer_client.register([hat.event.common.RegisterEvent(
 96            type=(*self._event_type_prefix, 'gateway', 'status', name),
 97            source_timestamp=None,
 98            payload=hat.event.common.EventPayloadJson(status))])
 99
100
101info = common.DeviceInfo(
102    type='ping',
103    create=create,
104    json_schema_id="hat-gateway://ping.yaml#/$defs/device",
105    json_schema_repo=common.json_schema_repo)
mlog: logging.Logger = <Logger hat.gateway.devices.ping (WARNING)>
async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> PingDevice:
19async def create(conf: common.DeviceConf,
20                 eventer_client: hat.event.eventer.Client,
21                 event_type_prefix: common.EventTypePrefix
22                 ) -> 'PingDevice':
23    device = PingDevice()
24    device._eventer_client = eventer_client
25    device._event_type_prefix = event_type_prefix
26    device._devices_status = {}
27
28    device._endpoint = await icmp.create_endpoint()
29
30    for device_conf in conf['remote_devices']:
31        device.async_group.spawn(device._remote_device_loop, device_conf)
32
33    return device
class PingDevice(hat.gateway.common.Device):
36class PingDevice(common.Device):
37
38    @property
39    def async_group(self) -> aio.Group:
40        return self._endpoint._async_group
41
42    def process_events(self, events: Collection[hat.event.common.Event]):
43        pass
44
45    async def _remote_device_loop(self, device_conf):
46        name = device_conf['name']
47        try:
48            await self._register_status("NOT_AVAILABLE", name)
49            while True:
50                try:
51                    await self._ping_retry(device_conf)
52                    status = "AVAILABLE"
53                    mlog.debug('ping to %s successfull', device_conf['host'])
54
55                except Exception as e:
56                    mlog.debug("device %s not available: %s",
57                               device_conf['host'], e, exc_info=e)
58                    status = "NOT_AVAILABLE"
59
60                await self._register_status(status, name)
61                await asyncio.sleep(device_conf['ping_delay'])
62
63        except ConnectionError:
64            pass
65
66        except Exception as e:
67            mlog.error("device %s loop error: %s", name, e, exc_info=e)
68
69        finally:
70            self.close()
71            with contextlib.suppress(ConnectionError):
72                await self._register_status("NOT_AVAILABLE", name)
73
74    async def _ping_retry(self, device_conf):
75        retry_count = device_conf['retry_count']
76        for i in range(retry_count + 1):
77            try:
78                return await aio.wait_for(
79                    self._endpoint.ping(device_conf['host']),
80                    timeout=device_conf['ping_timeout'])
81
82            except Exception as e:
83                retry_msg = (f", retry {i}/{retry_count}" if i > 0 else "")
84                mlog.debug('no ping response%s: %s', retry_msg, e, exc_info=e)
85
86            await asyncio.sleep(device_conf['retry_delay'])
87
88        raise Exception(f"no ping response after {retry_count} retries")
89
90    async def _register_status(self, status, name):
91        old_status = self._devices_status.get(name)
92        if old_status == status:
93            return
94        mlog.debug('remote device %s status %s', name, status)
95        self._devices_status[name] = status
96        await self._eventer_client.register([hat.event.common.RegisterEvent(
97            type=(*self._event_type_prefix, 'gateway', 'status', name),
98            source_timestamp=None,
99            payload=hat.event.common.EventPayloadJson(status))])

Device interface

async_group: hat.aio.group.Group
38    @property
39    def async_group(self) -> aio.Group:
40        return self._endpoint._async_group

Group controlling resource's lifetime.

def process_events(self, events: Collection[hat.event.common.common.Event]):
42    def process_events(self, events: Collection[hat.event.common.Event]):
43        pass

Process received events

This method can be coroutine or regular function.

info = DeviceInfo(type='ping', create=<function create>, json_schema_id='hat-gateway://ping.yaml#/$defs/device', json_schema_repo=<hat.json.repository.SchemaRepository object>)