hat.gateway.devices.modbus.master.device

  1from collections.abc import Collection
  2import asyncio
  3import contextlib
  4import logging
  5
  6from hat import aio
  7import hat.event.common
  8import hat.event.eventer
  9
 10from hat.gateway import common
 11from hat.gateway.devices.modbus.master.connection import connect
 12from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceEnableReq,  # NOQA
 13                                                              RemoteDeviceWriteReq,  # NOQA
 14                                                              StatusRes,
 15                                                              RemoteDeviceStatusRes,  # NOQA
 16                                                              RemoteDeviceWriteRes,  # NOQA
 17                                                              EventerClientProxy)  # NOQA
 18from hat.gateway.devices.modbus.master.remote_device import RemoteDevice
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23
 24class ModbusMasterDevice(aio.Resource):
 25
 26    def __init__(self,
 27                 conf: common.DeviceConf,
 28                 eventer_client: hat.event.eventer.Client,
 29                 event_type_prefix: common.EventTypePrefix):
 30        self._conf = conf
 31        self._log_prefix = f"gateway device {conf['name']}"
 32        self._eventer_client = EventerClientProxy(eventer_client,
 33                                                  event_type_prefix,
 34                                                  self._log_prefix)
 35        self._enabled_devices = set()
 36        self._status = None
 37        self._conn = None
 38        self._devices = {}
 39        self._readers = {}
 40        self._async_group = aio.Group()
 41
 42        self.async_group.spawn(self._connection_loop)
 43
 44    @property
 45    def async_group(self) -> aio.Group:
 46        return self._async_group
 47
 48    async def process_events(self, events: Collection[hat.event.common.Event]):
 49        try:
 50            for request in self._eventer_client.process_events(events):
 51                if isinstance(request, RemoteDeviceEnableReq):
 52                    self._log(logging.DEBUG,
 53                              'received remote device enable request')
 54                    if request.enable:
 55                        self._enable_remote_device(request.device_id)
 56                    else:
 57                        await self._disable_remote_device(request.device_id)
 58
 59                elif isinstance(request, RemoteDeviceWriteReq):
 60                    self._log(logging.DEBUG,
 61                              'received remote device write request')
 62                    if self._conn and self._conn.is_open:
 63                        self._conn.async_group.spawn(
 64                            self._write, request.device_id, request.data_name,
 65                            request.request_id, request.value)
 66
 67                else:
 68                    raise ValueError('invalid request')
 69
 70        except Exception as e:
 71            self._log(logging.ERROR, 'process events error: %s', e, exc_info=e)
 72            self.close()
 73
 74    async def _connection_loop(self):
 75
 76        async def cleanup():
 77            if self._conn:
 78                await self._conn.async_close()
 79
 80            with contextlib.suppress(Exception):
 81                await self._set_status('DISCONNECTED')
 82
 83        try:
 84            self._log(logging.DEBUG, 'starting connection loop')
 85
 86            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 87            self._enabled_devices.update(enabled_devices)
 88
 89            while True:
 90                await self._set_status('CONNECTING')
 91
 92                try:
 93                    self._conn = await aio.wait_for(
 94                        connect(self._conf['connection'],
 95                                self._log_prefix),
 96                        self._conf['connection']['connect_timeout'])
 97
 98                except aio.CancelledWithResultError as e:
 99                    self._conn = e.result
100                    raise
101
102                except Exception as e:
103                    self._log(logging.INFO, 'connecting error: %s', e,
104                              exc_info=e)
105                    await self._set_status('DISCONNECTED')
106                    await asyncio.sleep(
107                        self._conf['connection']['connect_delay'])
108                    continue
109
110                await self._set_status('CONNECTED')
111                self._devices = {}
112                self._readers = {}
113
114                self._log(logging.DEBUG, 'creating remote devices')
115                for device_conf in self._conf['remote_devices']:
116                    device = RemoteDevice(device_conf, self._conn,
117                                          self._log_prefix)
118                    self._devices[device.device_id] = device
119
120                    if device.device_id in self._enabled_devices:
121                        self._enable_remote_device(device.device_id)
122
123                    else:
124                        await self._notify_response(RemoteDeviceStatusRes(
125                            device_id=device.device_id,
126                            status='DISABLED'))
127
128                await self._conn.wait_closing()
129                await self._conn.async_close()
130
131                await self._set_status('DISCONNECTED')
132
133        except Exception as e:
134            self._log(logging.ERROR, 'connection loop error: %s', e,
135                      exc_info=e)
136
137        finally:
138            self._log(logging.DEBUG, 'closing connection loop')
139            self.close()
140            await aio.uncancellable(cleanup())
141
142    async def _notify_response(self, response):
143        await self._eventer_client.write([response])
144
145    async def _set_status(self, status):
146        if self._status == status:
147            return
148
149        self._log(logging.DEBUG, 'changing status: %s -> %s',
150                  self._status, status)
151        self._status = status
152        await self._notify_response(StatusRes(status))
153
154    def _enable_remote_device(self, device_id):
155        self._log(logging.DEBUG, 'enabling device %s', device_id)
156        self._enabled_devices.add(device_id)
157
158        device = self._devices.get(device_id)
159        if not device:
160            self._log(logging.DEBUG, 'device %s is not available', device_id)
161            return
162        if not device.conn.is_open:
163            self._log(logging.DEBUG, 'connection is not available')
164            return
165
166        reader = self._readers.get(device_id)
167        if reader and reader.is_open:
168            self._log(logging.DEBUG, 'reader %s is already running', device_id)
169            return
170
171        self._log(logging.DEBUG, 'creating reader for device %s', device_id)
172        reader = device.create_reader(self._notify_response)
173        self._readers[device.device_id] = reader
174
175    async def _disable_remote_device(self, device_id):
176        self._log(logging.DEBUG, 'disabling device %s', device_id)
177        self._enabled_devices.discard(device_id)
178
179        reader = self._readers.pop(device_id, None)
180        if not reader:
181            self._log(logging.DEBUG, 'device reader %s is not available',
182                      device_id)
183            return
184
185        await reader.async_close()
186
187    async def _write(self, device_id, data_name, request_id, value):
188        self._log(logging.DEBUG,
189                  'writing (device_id: %s; data_name: %s; value: %s)',
190                  device_id, data_name, value)
191
192        device = self._devices.get(device_id)
193        if not device:
194            self._log(logging.DEBUG, 'device %s is not available', device_id)
195            return
196
197        response = await device.write(data_name, request_id, value)
198        if response:
199            self._log(logging.DEBUG, 'writing result: %s', response.result)
200            await self._notify_response(response)
201
202    def _log(self, level, msg, *args, **kwargs):
203        if not mlog.isEnabledFor(level):
204            return
205
206        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)
mlog = <Logger hat.gateway.devices.modbus.master.device (WARNING)>
class ModbusMasterDevice(hat.aio.group.Resource):
 25class ModbusMasterDevice(aio.Resource):
 26
 27    def __init__(self,
 28                 conf: common.DeviceConf,
 29                 eventer_client: hat.event.eventer.Client,
 30                 event_type_prefix: common.EventTypePrefix):
 31        self._conf = conf
 32        self._log_prefix = f"gateway device {conf['name']}"
 33        self._eventer_client = EventerClientProxy(eventer_client,
 34                                                  event_type_prefix,
 35                                                  self._log_prefix)
 36        self._enabled_devices = set()
 37        self._status = None
 38        self._conn = None
 39        self._devices = {}
 40        self._readers = {}
 41        self._async_group = aio.Group()
 42
 43        self.async_group.spawn(self._connection_loop)
 44
 45    @property
 46    def async_group(self) -> aio.Group:
 47        return self._async_group
 48
 49    async def process_events(self, events: Collection[hat.event.common.Event]):
 50        try:
 51            for request in self._eventer_client.process_events(events):
 52                if isinstance(request, RemoteDeviceEnableReq):
 53                    self._log(logging.DEBUG,
 54                              'received remote device enable request')
 55                    if request.enable:
 56                        self._enable_remote_device(request.device_id)
 57                    else:
 58                        await self._disable_remote_device(request.device_id)
 59
 60                elif isinstance(request, RemoteDeviceWriteReq):
 61                    self._log(logging.DEBUG,
 62                              'received remote device write request')
 63                    if self._conn and self._conn.is_open:
 64                        self._conn.async_group.spawn(
 65                            self._write, request.device_id, request.data_name,
 66                            request.request_id, request.value)
 67
 68                else:
 69                    raise ValueError('invalid request')
 70
 71        except Exception as e:
 72            self._log(logging.ERROR, 'process events error: %s', e, exc_info=e)
 73            self.close()
 74
 75    async def _connection_loop(self):
 76
 77        async def cleanup():
 78            if self._conn:
 79                await self._conn.async_close()
 80
 81            with contextlib.suppress(Exception):
 82                await self._set_status('DISCONNECTED')
 83
 84        try:
 85            self._log(logging.DEBUG, 'starting connection loop')
 86
 87            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 88            self._enabled_devices.update(enabled_devices)
 89
 90            while True:
 91                await self._set_status('CONNECTING')
 92
 93                try:
 94                    self._conn = await aio.wait_for(
 95                        connect(self._conf['connection'],
 96                                self._log_prefix),
 97                        self._conf['connection']['connect_timeout'])
 98
 99                except aio.CancelledWithResultError as e:
100                    self._conn = e.result
101                    raise
102
103                except Exception as e:
104                    self._log(logging.INFO, 'connecting error: %s', e,
105                              exc_info=e)
106                    await self._set_status('DISCONNECTED')
107                    await asyncio.sleep(
108                        self._conf['connection']['connect_delay'])
109                    continue
110
111                await self._set_status('CONNECTED')
112                self._devices = {}
113                self._readers = {}
114
115                self._log(logging.DEBUG, 'creating remote devices')
116                for device_conf in self._conf['remote_devices']:
117                    device = RemoteDevice(device_conf, self._conn,
118                                          self._log_prefix)
119                    self._devices[device.device_id] = device
120
121                    if device.device_id in self._enabled_devices:
122                        self._enable_remote_device(device.device_id)
123
124                    else:
125                        await self._notify_response(RemoteDeviceStatusRes(
126                            device_id=device.device_id,
127                            status='DISABLED'))
128
129                await self._conn.wait_closing()
130                await self._conn.async_close()
131
132                await self._set_status('DISCONNECTED')
133
134        except Exception as e:
135            self._log(logging.ERROR, 'connection loop error: %s', e,
136                      exc_info=e)
137
138        finally:
139            self._log(logging.DEBUG, 'closing connection loop')
140            self.close()
141            await aio.uncancellable(cleanup())
142
143    async def _notify_response(self, response):
144        await self._eventer_client.write([response])
145
146    async def _set_status(self, status):
147        if self._status == status:
148            return
149
150        self._log(logging.DEBUG, 'changing status: %s -> %s',
151                  self._status, status)
152        self._status = status
153        await self._notify_response(StatusRes(status))
154
155    def _enable_remote_device(self, device_id):
156        self._log(logging.DEBUG, 'enabling device %s', device_id)
157        self._enabled_devices.add(device_id)
158
159        device = self._devices.get(device_id)
160        if not device:
161            self._log(logging.DEBUG, 'device %s is not available', device_id)
162            return
163        if not device.conn.is_open:
164            self._log(logging.DEBUG, 'connection is not available')
165            return
166
167        reader = self._readers.get(device_id)
168        if reader and reader.is_open:
169            self._log(logging.DEBUG, 'reader %s is already running', device_id)
170            return
171
172        self._log(logging.DEBUG, 'creating reader for device %s', device_id)
173        reader = device.create_reader(self._notify_response)
174        self._readers[device.device_id] = reader
175
176    async def _disable_remote_device(self, device_id):
177        self._log(logging.DEBUG, 'disabling device %s', device_id)
178        self._enabled_devices.discard(device_id)
179
180        reader = self._readers.pop(device_id, None)
181        if not reader:
182            self._log(logging.DEBUG, 'device reader %s is not available',
183                      device_id)
184            return
185
186        await reader.async_close()
187
188    async def _write(self, device_id, data_name, request_id, value):
189        self._log(logging.DEBUG,
190                  'writing (device_id: %s; data_name: %s; value: %s)',
191                  device_id, data_name, value)
192
193        device = self._devices.get(device_id)
194        if not device:
195            self._log(logging.DEBUG, 'device %s is not available', device_id)
196            return
197
198        response = await device.write(data_name, request_id, value)
199        if response:
200            self._log(logging.DEBUG, 'writing result: %s', response.result)
201            await self._notify_response(response)
202
203    def _log(self, level, msg, *args, **kwargs):
204        if not mlog.isEnabledFor(level):
205            return
206
207        mlog.log(level, f"{self._log_prefix}: {msg}", *args, **kwargs)

Resource with lifetime control based on Group.

ModbusMasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27    def __init__(self,
28                 conf: common.DeviceConf,
29                 eventer_client: hat.event.eventer.Client,
30                 event_type_prefix: common.EventTypePrefix):
31        self._conf = conf
32        self._log_prefix = f"gateway device {conf['name']}"
33        self._eventer_client = EventerClientProxy(eventer_client,
34                                                  event_type_prefix,
35                                                  self._log_prefix)
36        self._enabled_devices = set()
37        self._status = None
38        self._conn = None
39        self._devices = {}
40        self._readers = {}
41        self._async_group = aio.Group()
42
43        self.async_group.spawn(self._connection_loop)
async_group: hat.aio.group.Group
45    @property
46    def async_group(self) -> aio.Group:
47        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
49    async def process_events(self, events: Collection[hat.event.common.Event]):
50        try:
51            for request in self._eventer_client.process_events(events):
52                if isinstance(request, RemoteDeviceEnableReq):
53                    self._log(logging.DEBUG,
54                              'received remote device enable request')
55                    if request.enable:
56                        self._enable_remote_device(request.device_id)
57                    else:
58                        await self._disable_remote_device(request.device_id)
59
60                elif isinstance(request, RemoteDeviceWriteReq):
61                    self._log(logging.DEBUG,
62                              'received remote device write request')
63                    if self._conn and self._conn.is_open:
64                        self._conn.async_group.spawn(
65                            self._write, request.device_id, request.data_name,
66                            request.request_id, request.value)
67
68                else:
69                    raise ValueError('invalid request')
70
71        except Exception as e:
72            self._log(logging.ERROR, 'process events error: %s', e, exc_info=e)
73            self.close()