hat.gateway.devices.modbus.master.device

  1import asyncio
  2import contextlib
  3import logging
  4
  5from hat import aio
  6import hat.event.common
  7import hat.event.eventer
  8
  9from hat.gateway.devices.modbus.master import common
 10from hat.gateway.devices.modbus.master.connection import connect
 11from hat.gateway.devices.modbus.master.eventer_client import EventerClientProxy
 12from hat.gateway.devices.modbus.master.remote_device import RemoteDevice
 13
 14
 15mlog = logging.getLogger(__name__)
 16
 17
 18class ModbusMasterDevice(aio.Resource):
 19
 20    def __init__(self,
 21                 conf: common.DeviceConf,
 22                 eventer_client: hat.event.eventer.Client,
 23                 event_type_prefix: common.EventTypePrefix):
 24        self._conf = conf
 25        self._eventer_client = EventerClientProxy(eventer_client,
 26                                                  event_type_prefix,
 27                                                  conf['name'])
 28        self._enabled_devices = set()
 29        self._status = None
 30        self._conn = None
 31        self._devices = {}
 32        self._readers = {}
 33        self._async_group = aio.Group()
 34        self._log = common.create_device_logger_adapter(mlog, conf['name'])
 35
 36        self.async_group.spawn(self._connection_loop)
 37
 38    @property
 39    def async_group(self) -> aio.Group:
 40        return self._async_group
 41
 42    async def process_event(self, event: hat.event.common.Event):
 43        try:
 44            request = self._eventer_client.process_event(event)
 45
 46            if isinstance(request, common.RemoteDeviceEnableReq):
 47                self._log.debug('received remote device enable request')
 48
 49                if request.enable:
 50                    self._enable_remote_device(request.device_id)
 51
 52                else:
 53                    await self._disable_remote_device(request.device_id)
 54
 55            elif isinstance(request, common.RemoteDeviceWriteReq):
 56                self._log.debug('received remote device write request')
 57
 58                if self._conn and self._conn.is_open:
 59                    self._conn.async_group.spawn(
 60                        self._write, request.device_id, request.data_name,
 61                        request.request_id, request.value)
 62
 63            else:
 64                raise ValueError('invalid request')
 65
 66        except Exception as e:
 67            self._log.warning('error processing event: %s', e, exc_info=e)
 68
 69    async def _connection_loop(self):
 70
 71        async def cleanup():
 72            if self._conn:
 73                await self._conn.async_close()
 74
 75            with contextlib.suppress(Exception):
 76                await self._set_status('DISCONNECTED')
 77
 78        try:
 79            self._log.debug('starting connection loop')
 80
 81            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 82            self._enabled_devices.update(enabled_devices)
 83
 84            while True:
 85                await self._set_status('CONNECTING')
 86
 87                try:
 88                    self._conn = await aio.wait_for(
 89                        connect(self._conf['connection'], self._conf['name']),
 90                        self._conf['connection']['connect_timeout'])
 91
 92                except aio.CancelledWithResultError as e:
 93                    self._conn = e.result
 94                    raise
 95
 96                except Exception as e:
 97                    self._log.info('connecting error: %s', e, exc_info=e)
 98                    await self._set_status('DISCONNECTED')
 99                    await asyncio.sleep(
100                        self._conf['connection']['connect_delay'])
101                    continue
102
103                await self._set_status('CONNECTED')
104                self._devices = {}
105                self._readers = {}
106
107                self._log.debug('creating remote devices')
108                for device_conf in self._conf['remote_devices']:
109                    device = RemoteDevice(device_conf, self._conn,
110                                          self._conf['name'])
111                    self._devices[device.device_id] = device
112
113                    if device.device_id in self._enabled_devices:
114                        self._enable_remote_device(device.device_id)
115
116                    else:
117                        await self._notify_response(
118                            common.RemoteDeviceStatusRes(
119                                device_id=device.device_id,
120                                status='DISABLED'))
121
122                await self._conn.wait_closing()
123                await self._conn.async_close()
124
125                await self._set_status('DISCONNECTED')
126
127        except Exception as e:
128            self._log.error('connection loop error: %s', e, exc_info=e)
129
130        finally:
131            self._log.debug('closing connection loop')
132            self.close()
133            await aio.uncancellable(cleanup())
134
135    async def _notify_response(self, response):
136        await self._eventer_client.write([response])
137
138    async def _set_status(self, status):
139        if self._status == status:
140            return
141
142        self._log.debug('changing status: %s -> %s', self._status, status)
143        self._status = status
144        await self._notify_response(common.StatusRes(status))
145
146    def _enable_remote_device(self, device_id):
147        self._log.debug('enabling device %s', device_id)
148        self._enabled_devices.add(device_id)
149
150        device = self._devices.get(device_id)
151        if not device:
152            self._log.debug('device %s is not available', device_id)
153            return
154        if not device.conn.is_open:
155            self._log.debug('connection is not available')
156            return
157
158        reader = self._readers.get(device_id)
159        if reader and reader.is_open:
160            self._log.debug('reader %s is already running', device_id)
161            return
162
163        self._log.debug('creating reader for device %s', device_id)
164        reader = device.create_reader(self._notify_response)
165        self._readers[device.device_id] = reader
166
167    async def _disable_remote_device(self, device_id):
168        self._log.debug('disabling device %s', device_id)
169        self._enabled_devices.discard(device_id)
170
171        reader = self._readers.pop(device_id, None)
172        if not reader:
173            self._log.debug('device reader %s is not available', device_id)
174            return
175
176        await reader.async_close()
177
178    async def _write(self, device_id, data_name, request_id, value):
179        self._log.debug('writing (device_id: %s; data_name: %s; value: %s)',
180                        device_id, data_name, value)
181
182        device = self._devices.get(device_id)
183        if not device:
184            self._log.debug('device %s is not available', device_id)
185            return
186
187        response = await device.write(data_name, request_id, value)
188        if response:
189            self._log.debug('writing result: %s', response.result)
190            await self._notify_response(response)
mlog = <Logger hat.gateway.devices.modbus.master.device (WARNING)>
class ModbusMasterDevice(hat.aio.group.Resource):
 19class ModbusMasterDevice(aio.Resource):
 20
 21    def __init__(self,
 22                 conf: common.DeviceConf,
 23                 eventer_client: hat.event.eventer.Client,
 24                 event_type_prefix: common.EventTypePrefix):
 25        self._conf = conf
 26        self._eventer_client = EventerClientProxy(eventer_client,
 27                                                  event_type_prefix,
 28                                                  conf['name'])
 29        self._enabled_devices = set()
 30        self._status = None
 31        self._conn = None
 32        self._devices = {}
 33        self._readers = {}
 34        self._async_group = aio.Group()
 35        self._log = common.create_device_logger_adapter(mlog, conf['name'])
 36
 37        self.async_group.spawn(self._connection_loop)
 38
 39    @property
 40    def async_group(self) -> aio.Group:
 41        return self._async_group
 42
 43    async def process_event(self, event: hat.event.common.Event):
 44        try:
 45            request = self._eventer_client.process_event(event)
 46
 47            if isinstance(request, common.RemoteDeviceEnableReq):
 48                self._log.debug('received remote device enable request')
 49
 50                if request.enable:
 51                    self._enable_remote_device(request.device_id)
 52
 53                else:
 54                    await self._disable_remote_device(request.device_id)
 55
 56            elif isinstance(request, common.RemoteDeviceWriteReq):
 57                self._log.debug('received remote device write request')
 58
 59                if self._conn and self._conn.is_open:
 60                    self._conn.async_group.spawn(
 61                        self._write, request.device_id, request.data_name,
 62                        request.request_id, request.value)
 63
 64            else:
 65                raise ValueError('invalid request')
 66
 67        except Exception as e:
 68            self._log.warning('error processing event: %s', e, exc_info=e)
 69
 70    async def _connection_loop(self):
 71
 72        async def cleanup():
 73            if self._conn:
 74                await self._conn.async_close()
 75
 76            with contextlib.suppress(Exception):
 77                await self._set_status('DISCONNECTED')
 78
 79        try:
 80            self._log.debug('starting connection loop')
 81
 82            enabled_devices = await self._eventer_client.query_enabled_devices()  # NOQA
 83            self._enabled_devices.update(enabled_devices)
 84
 85            while True:
 86                await self._set_status('CONNECTING')
 87
 88                try:
 89                    self._conn = await aio.wait_for(
 90                        connect(self._conf['connection'], self._conf['name']),
 91                        self._conf['connection']['connect_timeout'])
 92
 93                except aio.CancelledWithResultError as e:
 94                    self._conn = e.result
 95                    raise
 96
 97                except Exception as e:
 98                    self._log.info('connecting error: %s', e, exc_info=e)
 99                    await self._set_status('DISCONNECTED')
100                    await asyncio.sleep(
101                        self._conf['connection']['connect_delay'])
102                    continue
103
104                await self._set_status('CONNECTED')
105                self._devices = {}
106                self._readers = {}
107
108                self._log.debug('creating remote devices')
109                for device_conf in self._conf['remote_devices']:
110                    device = RemoteDevice(device_conf, self._conn,
111                                          self._conf['name'])
112                    self._devices[device.device_id] = device
113
114                    if device.device_id in self._enabled_devices:
115                        self._enable_remote_device(device.device_id)
116
117                    else:
118                        await self._notify_response(
119                            common.RemoteDeviceStatusRes(
120                                device_id=device.device_id,
121                                status='DISABLED'))
122
123                await self._conn.wait_closing()
124                await self._conn.async_close()
125
126                await self._set_status('DISCONNECTED')
127
128        except Exception as e:
129            self._log.error('connection loop error: %s', e, exc_info=e)
130
131        finally:
132            self._log.debug('closing connection loop')
133            self.close()
134            await aio.uncancellable(cleanup())
135
136    async def _notify_response(self, response):
137        await self._eventer_client.write([response])
138
139    async def _set_status(self, status):
140        if self._status == status:
141            return
142
143        self._log.debug('changing status: %s -> %s', self._status, status)
144        self._status = status
145        await self._notify_response(common.StatusRes(status))
146
147    def _enable_remote_device(self, device_id):
148        self._log.debug('enabling device %s', device_id)
149        self._enabled_devices.add(device_id)
150
151        device = self._devices.get(device_id)
152        if not device:
153            self._log.debug('device %s is not available', device_id)
154            return
155        if not device.conn.is_open:
156            self._log.debug('connection is not available')
157            return
158
159        reader = self._readers.get(device_id)
160        if reader and reader.is_open:
161            self._log.debug('reader %s is already running', device_id)
162            return
163
164        self._log.debug('creating reader for device %s', device_id)
165        reader = device.create_reader(self._notify_response)
166        self._readers[device.device_id] = reader
167
168    async def _disable_remote_device(self, device_id):
169        self._log.debug('disabling device %s', device_id)
170        self._enabled_devices.discard(device_id)
171
172        reader = self._readers.pop(device_id, None)
173        if not reader:
174            self._log.debug('device reader %s is not available', device_id)
175            return
176
177        await reader.async_close()
178
179    async def _write(self, device_id, data_name, request_id, value):
180        self._log.debug('writing (device_id: %s; data_name: %s; value: %s)',
181                        device_id, data_name, value)
182
183        device = self._devices.get(device_id)
184        if not device:
185            self._log.debug('device %s is not available', device_id)
186            return
187
188        response = await device.write(data_name, request_id, value)
189        if response:
190            self._log.debug('writing result: %s', response.result)
191            await self._notify_response(response)

Resource with lifetime control based on Group.

ModbusMasterDevice( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
21    def __init__(self,
22                 conf: common.DeviceConf,
23                 eventer_client: hat.event.eventer.Client,
24                 event_type_prefix: common.EventTypePrefix):
25        self._conf = conf
26        self._eventer_client = EventerClientProxy(eventer_client,
27                                                  event_type_prefix,
28                                                  conf['name'])
29        self._enabled_devices = set()
30        self._status = None
31        self._conn = None
32        self._devices = {}
33        self._readers = {}
34        self._async_group = aio.Group()
35        self._log = common.create_device_logger_adapter(mlog, conf['name'])
36
37        self.async_group.spawn(self._connection_loop)
async_group: hat.aio.group.Group
39    @property
40    def async_group(self) -> aio.Group:
41        return self._async_group

Group controlling resource's lifetime.

async def process_event(self, event: hat.event.common.common.Event):
43    async def process_event(self, event: hat.event.common.Event):
44        try:
45            request = self._eventer_client.process_event(event)
46
47            if isinstance(request, common.RemoteDeviceEnableReq):
48                self._log.debug('received remote device enable request')
49
50                if request.enable:
51                    self._enable_remote_device(request.device_id)
52
53                else:
54                    await self._disable_remote_device(request.device_id)
55
56            elif isinstance(request, common.RemoteDeviceWriteReq):
57                self._log.debug('received remote device write request')
58
59                if self._conn and self._conn.is_open:
60                    self._conn.async_group.spawn(
61                        self._write, request.device_id, request.data_name,
62                        request.request_id, request.value)
63
64            else:
65                raise ValueError('invalid request')
66
67        except Exception as e:
68            self._log.warning('error processing event: %s', e, exc_info=e)