hat.gateway.common

Common gateway interfaces

 1"""Common gateway interfaces"""
 2
 3import abc
 4import importlib.resources
 5import typing
 6
 7from hat import aio
 8from hat import json
 9import hat.event.common
10import hat.monitor.common
11
12
13with importlib.resources.as_file(importlib.resources.files(__package__) /
14                                 'json_schema_repo.json') as _path:
15    json_schema_repo: json.SchemaRepository = json.SchemaRepository(
16        json.json_schema_repo,
17        hat.monitor.common.json_schema_repo,
18        json.SchemaRepository.from_json(_path))
19    """JSON schema repository"""
20
21DeviceConf: typing.TypeAlias = json.Data
22"""Device configuration"""
23
24EventTypePrefix: typing.TypeAlias = hat.event.common.EventType
25"""Event type prefix"""
26
27CreateDevice: typing.TypeAlias = aio.AsyncCallable[[DeviceConf,
28                                                    'DeviceEventClient',
29                                                    EventTypePrefix],
30                                                   'Device']
31"""Create device callable"""
32
33
34class Device(aio.Resource):
35    """Device interface
36
37    Device is implemented as python module which is dynamically imported.
38    It is expected that this module implements:
39
40        * device_type (str): device type identification
41        * json_schema_id (Optional[str]): JSON schema id
42        * json_schema_repo (Optional[json.SchemaRepository]): JSON schema repo
43        * create (CreateDevice): creating new device instance
44
45    If module defines JSON schema repositoy and JSON schema id, JSON schema
46    repository will be used for additional validation of device configuration
47    with JSON schema id.
48
49    `create` is called with device configuration, appropriate instance of
50    `DeviceEventClient` and event type prefix. Event type prefix is defined
51    as [``gateway``, `<gateway_name>`, `<device_type>`, `<device_name>`].
52
53    """
54
55
56class DeviceEventClient(aio.Resource):
57    """Device's event client interface"""
58
59    @abc.abstractmethod
60    async def receive(self) -> list[hat.event.common.Event]:
61        """Receive device events"""
62
63    @abc.abstractmethod
64    def register(self, events: list[hat.event.common.RegisterEvent]):
65        """Register device events"""
66
67    @abc.abstractmethod
68    async def register_with_response(self,
69                                     events: list[hat.event.common.RegisterEvent]  # NOQA
70                                     ) -> list[hat.event.common.Event | None]:
71        """Register device events
72
73        Each `RegisterEvent` from `events` is paired with results `Event` if
74        new event was successfully created or `None` is new event could not be
75        created.
76
77        """
78
79    @abc.abstractmethod
80    async def query(self,
81                    data: hat.event.common.QueryData
82                    ) -> list[hat.event.common.Event]:
83        """Query device events from server"""
DeviceConf: TypeAlias = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]

Device configuration

EventTypePrefix: TypeAlias = Tuple[str, ...]

Event type prefix

CreateDevice: TypeAlias = Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], ForwardRef('DeviceEventClient'), Tuple[str, ...]], Union[ForwardRef('Device'), Awaitable[ForwardRef('Device')]]]

Create device callable

class Device(hat.aio.group.Resource):
35class Device(aio.Resource):
36    """Device interface
37
38    Device is implemented as python module which is dynamically imported.
39    It is expected that this module implements:
40
41        * device_type (str): device type identification
42        * json_schema_id (Optional[str]): JSON schema id
43        * json_schema_repo (Optional[json.SchemaRepository]): JSON schema repo
44        * create (CreateDevice): creating new device instance
45
46    If module defines JSON schema repositoy and JSON schema id, JSON schema
47    repository will be used for additional validation of device configuration
48    with JSON schema id.
49
50    `create` is called with device configuration, appropriate instance of
51    `DeviceEventClient` and event type prefix. Event type prefix is defined
52    as [``gateway``, `<gateway_name>`, `<device_type>`, `<device_name>`].
53
54    """

Device interface

Device is implemented as python module which is dynamically imported.

It is expected that this module implements:
  • device_type (str): device type identification
  • json_schema_id (Optional[str]): JSON schema id
  • json_schema_repo (Optional[json.SchemaRepository]): JSON schema repo
  • create (CreateDevice): creating new device instance

If module defines JSON schema repositoy and JSON schema id, JSON schema repository will be used for additional validation of device configuration with JSON schema id.

create is called with device configuration, appropriate instance of DeviceEventClient and event type prefix. Event type prefix is defined as [gateway, <gateway_name>, <device_type>, <device_name>].

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class DeviceEventClient(hat.aio.group.Resource):
57class DeviceEventClient(aio.Resource):
58    """Device's event client interface"""
59
60    @abc.abstractmethod
61    async def receive(self) -> list[hat.event.common.Event]:
62        """Receive device events"""
63
64    @abc.abstractmethod
65    def register(self, events: list[hat.event.common.RegisterEvent]):
66        """Register device events"""
67
68    @abc.abstractmethod
69    async def register_with_response(self,
70                                     events: list[hat.event.common.RegisterEvent]  # NOQA
71                                     ) -> list[hat.event.common.Event | None]:
72        """Register device events
73
74        Each `RegisterEvent` from `events` is paired with results `Event` if
75        new event was successfully created or `None` is new event could not be
76        created.
77
78        """
79
80    @abc.abstractmethod
81    async def query(self,
82                    data: hat.event.common.QueryData
83                    ) -> list[hat.event.common.Event]:
84        """Query device events from server"""

Device's event client interface

@abc.abstractmethod
async def receive(self) -> list[hat.event.common.data.Event]:
60    @abc.abstractmethod
61    async def receive(self) -> list[hat.event.common.Event]:
62        """Receive device events"""

Receive device events

@abc.abstractmethod
def register(self, events: list[hat.event.common.data.RegisterEvent]):
64    @abc.abstractmethod
65    def register(self, events: list[hat.event.common.RegisterEvent]):
66        """Register device events"""

Register device events

@abc.abstractmethod
async def register_with_response( self, events: list[hat.event.common.data.RegisterEvent]) -> list[hat.event.common.data.Event | None]:
68    @abc.abstractmethod
69    async def register_with_response(self,
70                                     events: list[hat.event.common.RegisterEvent]  # NOQA
71                                     ) -> list[hat.event.common.Event | None]:
72        """Register device events
73
74        Each `RegisterEvent` from `events` is paired with results `Event` if
75        new event was successfully created or `None` is new event could not be
76        created.
77
78        """

Register device events

Each RegisterEvent from events is paired with results Event if new event was successfully created or None is new event could not be created.

@abc.abstractmethod
async def query( self, data: hat.event.common.data.QueryData) -> list[hat.event.common.data.Event]:
80    @abc.abstractmethod
81    async def query(self,
82                    data: hat.event.common.QueryData
83                    ) -> list[hat.event.common.Event]:
84        """Query device events from server"""

Query device events from server

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
json_schema_repo: hat.json.repository.SchemaRepository = <hat.json.repository.SchemaRepository object>