hat.gateway.devices.modbus.master.common

 1from hat.gateway.common import *  # NOQA
 2
 3import logging
 4import typing
 5
 6
 7class RemoteDeviceEnableReq(typing.NamedTuple):
 8    device_id: int
 9    enable: bool
10
11
12class RemoteDeviceWriteReq(typing.NamedTuple):
13    device_id: int
14    data_name: str
15    request_id: str
16    value: int
17
18
19Request: typing.TypeAlias = RemoteDeviceEnableReq | RemoteDeviceWriteReq
20
21
22class StatusRes(typing.NamedTuple):
23    status: str
24
25
26class RemoteDeviceStatusRes(typing.NamedTuple):
27    device_id: int
28    status: str
29
30
31class RemoteDeviceReadRes(typing.NamedTuple):
32    device_id: int
33    data_name: str
34    result: str
35    value: int | None
36    cause: str | None
37
38
39class RemoteDeviceWriteRes(typing.NamedTuple):
40    device_id: int
41    data_name: str
42    request_id: str
43    result: str
44
45
46Response: typing.TypeAlias = (StatusRes |
47                              RemoteDeviceStatusRes |
48                              RemoteDeviceReadRes |
49                              RemoteDeviceWriteRes)
50
51
52class Timeout(typing.NamedTuple):
53    pass
54
55
56def create_device_logger_adapter(logger: logging.Logger,
57                                 name: str
58                                 ) -> logging.LoggerAdapter:
59    extra = {'meta': {'type': 'ModbusMasterDevice',
60                      'name': name}}
61
62    return logging.LoggerAdapter(logger, extra)
63
64
65def create_remote_device_logger_adapter(logger: logging.Logger,
66                                        name: str,
67                                        device_id: int
68                                        ) -> logging.LoggerAdapter:
69    extra = {'meta': {'type': 'ModbusMasterRemoteDevice',
70                      'name': name,
71                      'device_id': device_id}}
72
73    return logging.LoggerAdapter(logger, extra)
class RemoteDeviceEnableReq(typing.NamedTuple):
 8class RemoteDeviceEnableReq(typing.NamedTuple):
 9    device_id: int
10    enable: bool

RemoteDeviceEnableReq(device_id, enable)

RemoteDeviceEnableReq(device_id: int, enable: bool)

Create new instance of RemoteDeviceEnableReq(device_id, enable)

device_id: int

Alias for field number 0

enable: bool

Alias for field number 1

class RemoteDeviceWriteReq(typing.NamedTuple):
13class RemoteDeviceWriteReq(typing.NamedTuple):
14    device_id: int
15    data_name: str
16    request_id: str
17    value: int

RemoteDeviceWriteReq(device_id, data_name, request_id, value)

RemoteDeviceWriteReq(device_id: int, data_name: str, request_id: str, value: int)

Create new instance of RemoteDeviceWriteReq(device_id, data_name, request_id, value)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

request_id: str

Alias for field number 2

value: int

Alias for field number 3

class StatusRes(typing.NamedTuple):
23class StatusRes(typing.NamedTuple):
24    status: str

StatusRes(status,)

StatusRes(status: str)

Create new instance of StatusRes(status,)

status: str

Alias for field number 0

class RemoteDeviceStatusRes(typing.NamedTuple):
27class RemoteDeviceStatusRes(typing.NamedTuple):
28    device_id: int
29    status: str

RemoteDeviceStatusRes(device_id, status)

RemoteDeviceStatusRes(device_id: int, status: str)

Create new instance of RemoteDeviceStatusRes(device_id, status)

device_id: int

Alias for field number 0

status: str

Alias for field number 1

class RemoteDeviceReadRes(typing.NamedTuple):
32class RemoteDeviceReadRes(typing.NamedTuple):
33    device_id: int
34    data_name: str
35    result: str
36    value: int | None
37    cause: str | None

RemoteDeviceReadRes(device_id, data_name, result, value, cause)

RemoteDeviceReadRes( device_id: int, data_name: str, result: str, value: int | None, cause: str | None)

Create new instance of RemoteDeviceReadRes(device_id, data_name, result, value, cause)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

result: str

Alias for field number 2

value: int | None

Alias for field number 3

cause: str | None

Alias for field number 4

class RemoteDeviceWriteRes(typing.NamedTuple):
40class RemoteDeviceWriteRes(typing.NamedTuple):
41    device_id: int
42    data_name: str
43    request_id: str
44    result: str

RemoteDeviceWriteRes(device_id, data_name, request_id, result)

RemoteDeviceWriteRes(device_id: int, data_name: str, request_id: str, result: str)

Create new instance of RemoteDeviceWriteRes(device_id, data_name, request_id, result)

device_id: int

Alias for field number 0

data_name: str

Alias for field number 1

request_id: str

Alias for field number 2

result: str

Alias for field number 3

class Timeout(typing.NamedTuple):
53class Timeout(typing.NamedTuple):
54    pass

Timeout()

Timeout()

Create new instance of Timeout()

def create_device_logger_adapter(logger: logging.Logger, name: str) -> logging.LoggerAdapter:
57def create_device_logger_adapter(logger: logging.Logger,
58                                 name: str
59                                 ) -> logging.LoggerAdapter:
60    extra = {'meta': {'type': 'ModbusMasterDevice',
61                      'name': name}}
62
63    return logging.LoggerAdapter(logger, extra)
def create_remote_device_logger_adapter( logger: logging.Logger, name: str, device_id: int) -> logging.LoggerAdapter:
66def create_remote_device_logger_adapter(logger: logging.Logger,
67                                        name: str,
68                                        device_id: int
69                                        ) -> logging.LoggerAdapter:
70    extra = {'meta': {'type': 'ModbusMasterRemoteDevice',
71                      'name': name,
72                      'device_id': device_id}}
73
74    return logging.LoggerAdapter(logger, extra)