hat.gateway.devices.modbus.master.device

  1from collections.abc import Collection
  2import asyncio
  3import contextlib
  4import logging
  5
  6from hat import aio
  7import hat.event.common
  8import hat.event.eventer
  9
 10from hat.gateway.devices.modbus.master import common
 11from hat.gateway.devices.modbus.master.connection import connect
 12from hat.gateway.devices.modbus.master.eventer_client import (RemoteDeviceEnableReq,  # NOQA
 13                                                              RemoteDeviceWriteReq,  # NOQA
 14                                                              StatusRes,
 15                                                              RemoteDeviceStatusRes,  # NOQA
 16                                                              RemoteDeviceWriteRes,  # NOQA
 17                                                              EventerClientProxy)  # NOQA
 18from hat.gateway.devices.modbus.master.remote_device import RemoteDevice
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23
 24class ModbusMasterDevice(aio.Resource):
 25
 26    def __init__(self,
 27                 conf: common.DeviceConf,
 28                 eventer_client: hat.event.eventer.Client,
 29                 event_type_prefix: common.EventTypePrefix):
 30        self._conf = conf
 31        self._eventer_client = EventerClientProxy(eventer_client,
 32                                                  event_type_prefix,
 33                                                  conf['name'])
 34        self._enabled_devices = set()
 35        self._status = None
 36        self._conn = None
 37        self._devices = {}
 38        self._readers = {}
 39        self._async_group = aio.Group()
 40        self._log = common.create_device_logger_adapter(mlog, conf['name'])
 41
 42        self.async_group.spawn(self._connection_loop)
 43
 44    @property
 45    def async_group(self) -> aio.Group:
 46        return self._async_group
 47
 48    async def process_events(self, events: Collection[hat.event.common.Event]):
 49        try:
 50            for request in self._eventer_client.process_events(events):
 51                if isinstance(request, RemoteDeviceEnableReq):
 52                    self._log.debug('received remote device enable request')
 53                    if request.enable:
 54                        self._enable_remote_device(request.device_id)
 55                    else:
 56                        await self._disable_remote_device(request.device_id)
 57
 58                elif isinstance(request, RemoteDeviceWriteReq):
 59                    self._log.debug('received remote device write request')
 60                    if self._conn and self._conn.is_open:
 61                        self._conn.async_group.spawn(
 62                            self._write, request.device_id, request.data_name,
 63                            request.request_id, request.value)
 64
 65                else:
 66                    raise ValueError('invalid request')
 67
 68        except Exception as e:
 69            self._log.error('process events error: %s', e, exc_info=e)
 70            self.close()
 71
 72    async def _connection_loop(self):
 73
 74        async def cleanup():
 75            if self._conn:
 76                await self._conn.async_close()
 77
 78            with contextlib.suppress(Exception):
 79                await self._set_status('DISCONNECTED')
 80
 81        try:
 82            self._log.debug('starting connection loop')
 83
 84            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 85            self._enabled_devices.update(enabled_devices)
 86
 87            while True:
 88                await self._set_status('CONNECTING')
 89
 90                try:
 91                    self._conn = await aio.wait_for(
 92                        connect(self._conf['connection'], self._conf['name']),
 93                        self._conf['connection']['connect_timeout'])
 94
 95                except aio.CancelledWithResultError as e:
 96                    self._conn = e.result
 97                    raise
 98
 99                except Exception as e:
100                    self._log.info('connecting error: %s', e, exc_info=e)
101                    await self._set_status('DISCONNECTED')
102                    await asyncio.sleep(
103                        self._conf['connection']['connect_delay'])
104                    continue
105
106                await self._set_status('CONNECTED')
107                self._devices = {}
108                self._readers = {}
109
110                self._log.debug('creating remote devices')
111                for device_conf in self._conf['remote_devices']:
112                    device = RemoteDevice(device_conf, self._conn,
113                                          self._conf['name'])
114                    self._devices[device.device_id] = device
115
116                    if device.device_id in self._enabled_devices:
117                        self._enable_remote_device(device.device_id)
118
119                    else:
120                        await self._notify_response(RemoteDeviceStatusRes(
121                            device_id=device.device_id,
122                            status='DISABLED'))
123
124                await self._conn.wait_closing()
125                await self._conn.async_close()
126
127                await self._set_status('DISCONNECTED')
128
129        except Exception as e:
130            self._log.error('connection loop error: %s', e, exc_info=e)
131
132        finally:
133            self._log.debug('closing connection loop')
134            self.close()
135            await aio.uncancellable(cleanup())
136
137    async def _notify_response(self, response):
138        await self._eventer_client.write([response])
139
140    async def _set_status(self, status):
141        if self._status == status:
142            return
143
144        self._log.debug('changing status: %s -> %s', self._status, status)
145        self._status = status
146        await self._notify_response(StatusRes(status))
147
148    def _enable_remote_device(self, device_id):
149        self._log.debug('enabling device %s', device_id)
150        self._enabled_devices.add(device_id)
151
152        device = self._devices.get(device_id)
153        if not device:
154            self._log.debug('device %s is not available', device_id)
155            return
156        if not device.conn.is_open:
157            self._log.debug('connection is not available')
158            return
159
160        reader = self._readers.get(device_id)
161        if reader and reader.is_open:
162            self._log.debug('reader %s is already running', device_id)
163            return
164
165        self._log.debug('creating reader for device %s', device_id)
166        reader = device.create_reader(self._notify_response)
167        self._readers[device.device_id] = reader
168
169    async def _disable_remote_device(self, device_id):
170        self._log.debug('disabling device %s', device_id)
171        self._enabled_devices.discard(device_id)
172
173        reader = self._readers.pop(device_id, None)
174        if not reader:
175            self._log.debug('device reader %s is not available', device_id)
176            return
177
178        await reader.async_close()
179
180    async def _write(self, device_id, data_name, request_id, value):
181        self._log.debug('writing (device_id: %s; data_name: %s; value: %s)',
182                        device_id, data_name, value)
183
184        device = self._devices.get(device_id)
185        if not device:
186            self._log.debug('device %s is not available', device_id)
187            return
188
189        response = await device.write(data_name, request_id, value)
190        if response:
191            self._log.debug('writing result: %s', response.result)
192            await self._notify_response(response)
mlog = <Logger hat.gateway.devices.modbus.master.device (WARNING)>
class ModbusMasterDevice(hat.aio.group.Resource):
 25class ModbusMasterDevice(aio.Resource):
 26
 27    def __init__(self,
 28                 conf: common.DeviceConf,
 29                 eventer_client: hat.event.eventer.Client,
 30                 event_type_prefix: common.EventTypePrefix):
 31        self._conf = conf
 32        self._eventer_client = EventerClientProxy(eventer_client,
 33                                                  event_type_prefix,
 34                                                  conf['name'])
 35        self._enabled_devices = set()
 36        self._status = None
 37        self._conn = None
 38        self._devices = {}
 39        self._readers = {}
 40        self._async_group = aio.Group()
 41        self._log = common.create_device_logger_adapter(mlog, conf['name'])
 42
 43        self.async_group.spawn(self._connection_loop)
 44
 45    @property
 46    def async_group(self) -> aio.Group:
 47        return self._async_group
 48
 49    async def process_events(self, events: Collection[hat.event.common.Event]):
 50        try:
 51            for request in self._eventer_client.process_events(events):
 52                if isinstance(request, RemoteDeviceEnableReq):
 53                    self._log.debug('received remote device enable request')
 54                    if request.enable:
 55                        self._enable_remote_device(request.device_id)
 56                    else:
 57                        await self._disable_remote_device(request.device_id)
 58
 59                elif isinstance(request, RemoteDeviceWriteReq):
 60                    self._log.debug('received remote device write request')
 61                    if self._conn and self._conn.is_open:
 62                        self._conn.async_group.spawn(
 63                            self._write, request.device_id, request.data_name,
 64                            request.request_id, request.value)
 65
 66                else:
 67                    raise ValueError('invalid request')
 68
 69        except Exception as e:
 70            self._log.error('process events error: %s', e, exc_info=e)
 71            self.close()
 72
 73    async def _connection_loop(self):
 74
 75        async def cleanup():
 76            if self._conn:
 77                await self._conn.async_close()
 78
 79            with contextlib.suppress(Exception):
 80                await self._set_status('DISCONNECTED')
 81
 82        try:
 83            self._log.debug('starting connection loop')
 84
 85            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 86            self._enabled_devices.update(enabled_devices)
 87
 88            while True:
 89                await self._set_status('CONNECTING')
 90
 91                try:
 92                    self._conn = await aio.wait_for(
 93                        connect(self._conf['connection'], self._conf['name']),
 94                        self._conf['connection']['connect_timeout'])
 95
 96                except aio.CancelledWithResultError as e:
 97                    self._conn = e.result
 98                    raise
 99
100                except Exception as e:
101                    self._log.info('connecting error: %s', e, exc_info=e)
102                    await self._set_status('DISCONNECTED')
103                    await asyncio.sleep(
104                        self._conf['connection']['connect_delay'])
105                    continue
106
107                await self._set_status('CONNECTED')
108                self._devices = {}
109                self._readers = {}
110
111                self._log.debug('creating remote devices')
112                for device_conf in self._conf['remote_devices']:
113                    device = RemoteDevice(device_conf, self._conn,
114                                          self._conf['name'])
115                    self._devices[device.device_id] = device
116
117                    if device.device_id in self._enabled_devices:
118                        self._enable_remote_device(device.device_id)
119
120                    else:
121                        await self._notify_response(RemoteDeviceStatusRes(
122                            device_id=device.device_id,
123                            status='DISABLED'))
124
125                await self._conn.wait_closing()
126                await self._conn.async_close()
127
128                await self._set_status('DISCONNECTED')
129
130        except Exception as e:
131            self._log.error('connection loop error: %s', e, exc_info=e)
132
133        finally:
134            self._log.debug('closing connection loop')
135            self.close()
136            await aio.uncancellable(cleanup())
137
138    async def _notify_response(self, response):
139        await self._eventer_client.write([response])
140
141    async def _set_status(self, status):
142        if self._status == status:
143            return
144
145        self._log.debug('changing status: %s -> %s', self._status, status)
146        self._status = status
147        await self._notify_response(StatusRes(status))
148
149    def _enable_remote_device(self, device_id):
150        self._log.debug('enabling device %s', device_id)
151        self._enabled_devices.add(device_id)
152
153        device = self._devices.get(device_id)
154        if not device:
155            self._log.debug('device %s is not available', device_id)
156            return
157        if not device.conn.is_open:
158            self._log.debug('connection is not available')
159            return
160
161        reader = self._readers.get(device_id)
162        if reader and reader.is_open:
163            self._log.debug('reader %s is already running', device_id)
164            return
165
166        self._log.debug('creating reader for device %s', device_id)
167        reader = device.create_reader(self._notify_response)
168        self._readers[device.device_id] = reader
169
170    async def _disable_remote_device(self, device_id):
171        self._log.debug('disabling device %s', device_id)
172        self._enabled_devices.discard(device_id)
173
174        reader = self._readers.pop(device_id, None)
175        if not reader:
176            self._log.debug('device reader %s is not available', device_id)
177            return
178
179        await reader.async_close()
180
181    async def _write(self, device_id, data_name, request_id, value):
182        self._log.debug('writing (device_id: %s; data_name: %s; value: %s)',
183                        device_id, data_name, value)
184
185        device = self._devices.get(device_id)
186        if not device:
187            self._log.debug('device %s is not available', device_id)
188            return
189
190        response = await device.write(data_name, request_id, value)
191        if response:
192            self._log.debug('writing result: %s', response.result)
193            await self._notify_response(response)

Resource with lifetime control based on Group.

ModbusMasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27    def __init__(self,
28                 conf: common.DeviceConf,
29                 eventer_client: hat.event.eventer.Client,
30                 event_type_prefix: common.EventTypePrefix):
31        self._conf = conf
32        self._eventer_client = EventerClientProxy(eventer_client,
33                                                  event_type_prefix,
34                                                  conf['name'])
35        self._enabled_devices = set()
36        self._status = None
37        self._conn = None
38        self._devices = {}
39        self._readers = {}
40        self._async_group = aio.Group()
41        self._log = common.create_device_logger_adapter(mlog, conf['name'])
42
43        self.async_group.spawn(self._connection_loop)
async_group: hat.aio.group.Group
45    @property
46    def async_group(self) -> aio.Group:
47        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
49    async def process_events(self, events: Collection[hat.event.common.Event]):
50        try:
51            for request in self._eventer_client.process_events(events):
52                if isinstance(request, RemoteDeviceEnableReq):
53                    self._log.debug('received remote device enable request')
54                    if request.enable:
55                        self._enable_remote_device(request.device_id)
56                    else:
57                        await self._disable_remote_device(request.device_id)
58
59                elif isinstance(request, RemoteDeviceWriteReq):
60                    self._log.debug('received remote device write request')
61                    if self._conn and self._conn.is_open:
62                        self._conn.async_group.spawn(
63                            self._write, request.device_id, request.data_name,
64                            request.request_id, request.value)
65
66                else:
67                    raise ValueError('invalid request')
68
69        except Exception as e:
70            self._log.error('process events error: %s', e, exc_info=e)
71            self.close()