hat.gateway.devices.iec103.master

IEC 60870-5-103 master device

  1"""IEC 60870-5-103 master device"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import collections
  6import contextlib
  7import datetime
  8import enum
  9import functools
 10import logging
 11
 12from hat import aio
 13from hat.drivers import iec103
 14from hat.drivers import serial
 15from hat.drivers.iec60870 import link
 16import hat.event.common
 17import hat.event.eventer
 18
 19from hat.gateway import common
 20
 21
 22mlog: logging.Logger = logging.getLogger(__name__)
 23
 24command_timeout: float = 100
 25
 26interrogate_timeout: float = 100
 27
 28
 29async def create(conf: common.DeviceConf,
 30                 eventer_client: hat.event.eventer.Client,
 31                 event_type_prefix: common.EventTypePrefix
 32                 ) -> 'Iec103MasterDevice':
 33    event_types = [(*event_type_prefix, 'system', 'remote_device',
 34                    str(i['address']), 'enable')
 35                   for i in conf['remote_devices']]
 36    params = hat.event.common.QueryLatestParams(event_types)
 37    result = await eventer_client.query(params)
 38
 39    device = Iec103MasterDevice(conf=conf,
 40                                eventer_client=eventer_client,
 41                                event_type_prefix=event_type_prefix)
 42    try:
 43        await device.process_events(result.events)
 44
 45    except BaseException:
 46        await aio.uncancellable(device.async_close())
 47        raise
 48
 49    return device
 50
 51
 52info: common.DeviceInfo = common.DeviceInfo(
 53    type="iec103_master",
 54    create=create,
 55    json_schema_id="hat-gateway://iec103.yaml#/$defs/master",
 56    json_schema_repo=common.json_schema_repo)
 57
 58
 59class Iec103MasterDevice(common.Device):
 60
 61    def __init__(self,
 62                 conf: common.DeviceConf,
 63                 eventer_client: hat.event.eventer.Client,
 64                 event_type_prefix: common.EventTypePrefix):
 65        self._conf = conf
 66        self._eventer_client = eventer_client
 67        self._event_type_prefix = event_type_prefix
 68        self._master = None
 69        self._conns = {}
 70        self._remote_enabled = {i['address']: False
 71                                for i in conf['remote_devices']}
 72        self._remote_confs = {i['address']: i
 73                              for i in conf['remote_devices']}
 74        self._remote_groups = {}
 75        self._async_group = aio.Group()
 76
 77        self.async_group.spawn(self._create_link_master_loop)
 78
 79    @property
 80    def async_group(self) -> aio.Group:
 81        return self._async_group
 82
 83    async def process_events(self, events: Collection[hat.event.common.Event]):
 84        for event in events:
 85            try:
 86                await self._process_event(event)
 87
 88            except Exception as e:
 89                mlog.warning('error processing event: %s', e, exc_info=e)
 90
 91    async def _create_link_master_loop(self):
 92
 93        async def cleanup():
 94            with contextlib.suppress(ConnectionError):
 95                await self._register_status('DISCONNECTED')
 96
 97            if self._master:
 98                await self._master.async_close()
 99
100        try:
101            while True:
102                await self._register_status('CONNECTING')
103
104                try:
105                    self._master = await link.unbalanced.create_master(
106                            port=self._conf['port'],
107                            baudrate=self._conf['baudrate'],
108                            bytesize=serial.ByteSize[self._conf['bytesize']],
109                            parity=serial.Parity[self._conf['parity']],
110                            stopbits=serial.StopBits[self._conf['stopbits']],
111                            xonxoff=self._conf['flow_control']['xonxoff'],
112                            rtscts=self._conf['flow_control']['rtscts'],
113                            dsrdtr=self._conf['flow_control']['dsrdtr'],
114                            silent_interval=self._conf['silent_interval'],
115                            address_size=link.AddressSize.ONE)
116
117                except Exception as e:
118                    mlog.warning('link master (endpoint) failed to create: %s',
119                                 e, exc_info=e)
120                    await self._register_status('DISCONNECTED')
121                    await asyncio.sleep(self._conf['reconnect_delay'])
122                    continue
123
124                await self._register_status('CONNECTED')
125                for address, enabled in self._remote_enabled.items():
126                    if enabled:
127                        self._enable_remote(address)
128
129                await self._master.wait_closed()
130                await self._register_status('DISCONNECTED')
131                self._master = None
132
133        finally:
134            mlog.debug('closing link master loop')
135            self.close()
136            await aio.uncancellable(cleanup())
137
138    async def _connection_loop(self, group, address):
139
140        async def cleanup():
141            with contextlib.suppress(ConnectionError):
142                await self._register_rmt_status(address, 'DISCONNECTED')
143
144            conn = self._conns.pop(address, None)
145            if conn:
146                await conn.async_close()
147
148        remote_conf = self._remote_confs[address]
149        try:
150            while True:
151                await self._register_rmt_status(address, 'CONNECTING')
152
153                try:
154                    conn_link = await self._master.connect(
155                        addr=address,
156                        response_timeout=remote_conf['response_timeout'],
157                        send_retry_count=remote_conf['send_retry_count'],
158                        poll_class1_delay=remote_conf['poll_class1_delay'],
159                        poll_class2_delay=remote_conf['poll_class2_delay'])
160
161                except Exception as e:
162                    mlog.error('connection error to address %s: %s',
163                               address, e, exc_info=e)
164                    await self._register_rmt_status(address, 'DISCONNECTED')
165                    await asyncio.sleep(remote_conf['reconnect_delay'])
166                    continue
167
168                await self._register_rmt_status(address, 'CONNECTED')
169                conn = iec103.MasterConnection(
170                    conn=conn_link,
171                    data_cb=functools.partial(self._on_data, address),
172                    generic_data_cb=None)
173                self._conns[address] = conn
174                if remote_conf['time_sync_delay'] is not None:
175                    group.spawn(self._time_sync_loop, conn,
176                                remote_conf['time_sync_delay'])
177
178                await conn.wait_closed()
179                await self._register_rmt_status(address, 'DISCONNECTED')
180                self._conns.pop(address)
181
182        finally:
183            mlog.debug('closing remote device %s', address)
184            group.close()
185            await aio.uncancellable(cleanup())
186
187    async def _time_sync_loop(self, conn, delay):
188        try:
189            while True:
190                await conn.time_sync()
191                mlog.debug('time sync')
192                await asyncio.sleep(delay)
193
194        except ConnectionError:
195            mlog.debug('connection closed')
196
197        finally:
198            conn.close()
199
200    async def _on_data(self, address, data):
201        events = collections.deque()
202        try:
203            for event in _events_from_data(data, address,
204                                           self._event_type_prefix):
205                events.append(event)
206
207        except Exception as e:
208            mlog.warning('data %s ignored due to: %s', data, e, exc_info=e)
209
210        if events:
211            await self._eventer_client.register(events)
212
213    async def _process_event(self, event):
214        prefix_len = len(self._event_type_prefix)
215        if event.type[prefix_len + 1] != 'remote_device':
216            raise Exception('unexpected event type')
217
218        address = self._address_from_event(event)
219        etype_suffix = event.type[prefix_len + 3:]
220
221        if etype_suffix[0] == 'enable':
222            self._process_enable(event)
223
224        elif etype_suffix[0] == 'command':
225            asdu = int(etype_suffix[1])
226            io = iec103.IoAddress(
227                function_type=int(etype_suffix[2]),
228                information_number=int(etype_suffix[3]))
229            self._process_command(event, address, asdu, io)
230
231        elif etype_suffix[0] == 'interrogation':
232            asdu = int(etype_suffix[1])
233            self._process_interrogation(event, address, asdu)
234
235        else:
236            raise Exception('unexpected event type')
237
238    def _process_enable(self, event):
239        address = self._address_from_event(event)
240        enable = event.payload.data
241        if address not in self._remote_enabled:
242            raise Exception('invalid remote device address')
243
244        if not isinstance(enable, bool):
245            raise Exception('invalid enable event payload')
246
247        self._remote_enabled[address] = enable
248
249        if not self._master:
250            return
251
252        if enable:
253            self._enable_remote(address)
254
255        else:
256            self._disable_remote(address)
257
258    def _enable_remote(self, address):
259        remote_group = self._remote_groups.get(address)
260        if remote_group and remote_group.is_open:
261            return
262
263        remote_group = self._async_group.create_subgroup()
264        self._remote_groups[address] = remote_group
265        remote_group.spawn(self._connection_loop, remote_group, address)
266
267    def _disable_remote(self, address):
268        if address in self._remote_groups:
269            remote_group = self._remote_groups.pop(address)
270            remote_group.close()
271
272    def _process_command(self, event, address, asdu, io):
273        conn = self._conns.get(address)
274        if not conn or not conn.is_open:
275            raise Exception('connection closed')
276
277        value = iec103.DoubleValue[event.payload.data['value']]
278        session_id = event.payload.data['session_id']
279        self._remote_groups[address].spawn(
280            self._cmd_req_res, conn, address, asdu, io, value, session_id)
281
282    async def _cmd_req_res(self, conn, address, asdu, io, value, session_id):
283        try:
284            success = await asyncio.wait_for(
285                conn.send_command(asdu, io, value), timeout=command_timeout)
286
287        except ConnectionError:
288            mlog.warning('command %s %s %s to %s failed: connection closed',
289                         asdu, io, value, address)
290            return
291
292        except asyncio.TimeoutError:
293            mlog.warning(
294                'command %s %s %s to %s timeout', asdu, io, value, address)
295            return
296
297        event = _create_event(
298            event_type=(*self._event_type_prefix, 'gateway', 'remote_device',
299                        str(address), 'command', str(asdu),
300                        str(io.function_type), str(io.information_number)),
301            payload={'success': success,
302                     'session_id': session_id})
303
304        await self._eventer_client.register([event])
305
306    def _process_interrogation(self, event, address, asdu):
307        conn = self._conns.get(address)
308        if not conn or not conn.is_open:
309            mlog.warning("event %s ignored due to connection closed", event)
310            return
311
312        self._remote_groups[address].spawn(
313            self._interrogate_req_res, conn, address, asdu)
314
315    async def _interrogate_req_res(self, conn, address, asdu):
316        try:
317            await asyncio.wait_for(conn.interrogate(asdu),
318                                   timeout=interrogate_timeout)
319
320        except ConnectionError:
321            mlog.warning('interrogation on %s to %s failed: connection closed',
322                         asdu, address)
323            return
324
325        except asyncio.TimeoutError:
326            mlog.warning('interrogation on %s to %s timeout', asdu, address)
327            return
328
329        event = _create_event(
330            event_type=(*self._event_type_prefix, 'gateway', 'remote_device',
331                        str(address), 'interrogation', str(asdu)),
332            payload=None)
333
334        await self._eventer_client.register([event])
335
336    async def _register_status(self, status):
337        event = _create_event(
338            event_type=(*self._event_type_prefix, 'gateway', 'status'),
339            payload=status)
340
341        await self._eventer_client.register([event])
342
343    async def _register_rmt_status(self, address, status):
344        event = _create_event(
345            event_type=(*self._event_type_prefix,
346                        'gateway', 'remote_device', str(address), 'status'),
347            payload=status)
348
349        await self._eventer_client.register([event])
350
351    def _address_from_event(self, event):
352        return int(event.type[len(self._event_type_prefix) + 2])
353
354
355def _events_from_data(data, address, event_type_prefix):
356    cause = (data.cause.name if isinstance(data.cause, enum.Enum)
357             else data.cause)
358
359    if isinstance(data.value, (iec103.DoubleWithTimeValue,
360                               iec103.DoubleWithRelativeTimeValue)):
361        data_type = 'double'
362        payload = {'cause': cause,
363                   'value': data.value.value.name}
364        source_ts = _time_iec103_to_source_ts(data.value.time)
365        event_type = _data_event_type(
366            data, address, data_type, event_type_prefix)
367        yield _create_event(event_type, payload, source_ts)
368
369    elif isinstance(data.value, iec103.MeasurandValues):
370        for meas_type, meas_value in data.value.values.items():
371            payload = {'cause': cause,
372                       'value': meas_value._asdict()}
373            data_type = meas_type.name.lower()
374            event_type = _data_event_type(
375                data, address, data_type, event_type_prefix)
376            yield _create_event(event_type, payload)
377
378    else:
379        raise Exception('unsupported data value')
380
381
382def _data_event_type(data, address, data_type, event_type_prefix):
383    return (
384        *event_type_prefix, 'gateway', 'remote_device', str(address),
385        'data', data_type,
386        str(data.asdu_address),
387        str(data.io_address.function_type),
388        str(data.io_address.information_number))
389
390
391def _time_iec103_to_source_ts(time_four):
392    t_now = datetime.datetime.now(datetime.timezone.utc)
393    candidates_now = [t_now - datetime.timedelta(hours=12),
394                      t_now,
395                      t_now + datetime.timedelta(hours=12)]
396    candidates_103 = [_upgrade_time_four_to_seven(
397                        time_four, iec103.time_from_datetime(t))
398                      for t in candidates_now]
399    candidates_dt = [iec103.time_to_datetime(t)
400                     for t in candidates_103 if t]
401    if not candidates_dt:
402        return
403
404    res = min(candidates_dt, key=lambda i: abs(t_now - i))
405    return hat.event.common.timestamp_from_datetime(res)
406
407
408def _upgrade_time_four_to_seven(time_four, time_seven):
409    if time_four.summer_time != time_seven.summer_time:
410        return
411
412    return time_four._replace(
413        day_of_week=time_seven.day_of_week,
414        day_of_month=time_seven.day_of_month,
415        months=time_seven.months,
416        years=time_seven.years,
417        size=iec103.TimeSize.SEVEN)
418
419
420def _create_event(event_type, payload, source_timestamp=None):
421    return hat.event.common.RegisterEvent(
422        type=event_type,
423        source_timestamp=source_timestamp,
424        payload=hat.event.common.EventPayloadJson(payload))
mlog: logging.Logger = <Logger hat.gateway.devices.iec103.master (WARNING)>
command_timeout: float = 100
interrogate_timeout: float = 100
async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec103MasterDevice:
30async def create(conf: common.DeviceConf,
31                 eventer_client: hat.event.eventer.Client,
32                 event_type_prefix: common.EventTypePrefix
33                 ) -> 'Iec103MasterDevice':
34    event_types = [(*event_type_prefix, 'system', 'remote_device',
35                    str(i['address']), 'enable')
36                   for i in conf['remote_devices']]
37    params = hat.event.common.QueryLatestParams(event_types)
38    result = await eventer_client.query(params)
39
40    device = Iec103MasterDevice(conf=conf,
41                                eventer_client=eventer_client,
42                                event_type_prefix=event_type_prefix)
43    try:
44        await device.process_events(result.events)
45
46    except BaseException:
47        await aio.uncancellable(device.async_close())
48        raise
49
50    return device
info: hat.gateway.common.DeviceInfo = DeviceInfo(type='iec103_master', create=<function create>, json_schema_id='hat-gateway://iec103.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class Iec103MasterDevice(hat.gateway.common.Device):
 60class Iec103MasterDevice(common.Device):
 61
 62    def __init__(self,
 63                 conf: common.DeviceConf,
 64                 eventer_client: hat.event.eventer.Client,
 65                 event_type_prefix: common.EventTypePrefix):
 66        self._conf = conf
 67        self._eventer_client = eventer_client
 68        self._event_type_prefix = event_type_prefix
 69        self._master = None
 70        self._conns = {}
 71        self._remote_enabled = {i['address']: False
 72                                for i in conf['remote_devices']}
 73        self._remote_confs = {i['address']: i
 74                              for i in conf['remote_devices']}
 75        self._remote_groups = {}
 76        self._async_group = aio.Group()
 77
 78        self.async_group.spawn(self._create_link_master_loop)
 79
 80    @property
 81    def async_group(self) -> aio.Group:
 82        return self._async_group
 83
 84    async def process_events(self, events: Collection[hat.event.common.Event]):
 85        for event in events:
 86            try:
 87                await self._process_event(event)
 88
 89            except Exception as e:
 90                mlog.warning('error processing event: %s', e, exc_info=e)
 91
 92    async def _create_link_master_loop(self):
 93
 94        async def cleanup():
 95            with contextlib.suppress(ConnectionError):
 96                await self._register_status('DISCONNECTED')
 97
 98            if self._master:
 99                await self._master.async_close()
100
101        try:
102            while True:
103                await self._register_status('CONNECTING')
104
105                try:
106                    self._master = await link.unbalanced.create_master(
107                            port=self._conf['port'],
108                            baudrate=self._conf['baudrate'],
109                            bytesize=serial.ByteSize[self._conf['bytesize']],
110                            parity=serial.Parity[self._conf['parity']],
111                            stopbits=serial.StopBits[self._conf['stopbits']],
112                            xonxoff=self._conf['flow_control']['xonxoff'],
113                            rtscts=self._conf['flow_control']['rtscts'],
114                            dsrdtr=self._conf['flow_control']['dsrdtr'],
115                            silent_interval=self._conf['silent_interval'],
116                            address_size=link.AddressSize.ONE)
117
118                except Exception as e:
119                    mlog.warning('link master (endpoint) failed to create: %s',
120                                 e, exc_info=e)
121                    await self._register_status('DISCONNECTED')
122                    await asyncio.sleep(self._conf['reconnect_delay'])
123                    continue
124
125                await self._register_status('CONNECTED')
126                for address, enabled in self._remote_enabled.items():
127                    if enabled:
128                        self._enable_remote(address)
129
130                await self._master.wait_closed()
131                await self._register_status('DISCONNECTED')
132                self._master = None
133
134        finally:
135            mlog.debug('closing link master loop')
136            self.close()
137            await aio.uncancellable(cleanup())
138
139    async def _connection_loop(self, group, address):
140
141        async def cleanup():
142            with contextlib.suppress(ConnectionError):
143                await self._register_rmt_status(address, 'DISCONNECTED')
144
145            conn = self._conns.pop(address, None)
146            if conn:
147                await conn.async_close()
148
149        remote_conf = self._remote_confs[address]
150        try:
151            while True:
152                await self._register_rmt_status(address, 'CONNECTING')
153
154                try:
155                    conn_link = await self._master.connect(
156                        addr=address,
157                        response_timeout=remote_conf['response_timeout'],
158                        send_retry_count=remote_conf['send_retry_count'],
159                        poll_class1_delay=remote_conf['poll_class1_delay'],
160                        poll_class2_delay=remote_conf['poll_class2_delay'])
161
162                except Exception as e:
163                    mlog.error('connection error to address %s: %s',
164                               address, e, exc_info=e)
165                    await self._register_rmt_status(address, 'DISCONNECTED')
166                    await asyncio.sleep(remote_conf['reconnect_delay'])
167                    continue
168
169                await self._register_rmt_status(address, 'CONNECTED')
170                conn = iec103.MasterConnection(
171                    conn=conn_link,
172                    data_cb=functools.partial(self._on_data, address),
173                    generic_data_cb=None)
174                self._conns[address] = conn
175                if remote_conf['time_sync_delay'] is not None:
176                    group.spawn(self._time_sync_loop, conn,
177                                remote_conf['time_sync_delay'])
178
179                await conn.wait_closed()
180                await self._register_rmt_status(address, 'DISCONNECTED')
181                self._conns.pop(address)
182
183        finally:
184            mlog.debug('closing remote device %s', address)
185            group.close()
186            await aio.uncancellable(cleanup())
187
188    async def _time_sync_loop(self, conn, delay):
189        try:
190            while True:
191                await conn.time_sync()
192                mlog.debug('time sync')
193                await asyncio.sleep(delay)
194
195        except ConnectionError:
196            mlog.debug('connection closed')
197
198        finally:
199            conn.close()
200
201    async def _on_data(self, address, data):
202        events = collections.deque()
203        try:
204            for event in _events_from_data(data, address,
205                                           self._event_type_prefix):
206                events.append(event)
207
208        except Exception as e:
209            mlog.warning('data %s ignored due to: %s', data, e, exc_info=e)
210
211        if events:
212            await self._eventer_client.register(events)
213
214    async def _process_event(self, event):
215        prefix_len = len(self._event_type_prefix)
216        if event.type[prefix_len + 1] != 'remote_device':
217            raise Exception('unexpected event type')
218
219        address = self._address_from_event(event)
220        etype_suffix = event.type[prefix_len + 3:]
221
222        if etype_suffix[0] == 'enable':
223            self._process_enable(event)
224
225        elif etype_suffix[0] == 'command':
226            asdu = int(etype_suffix[1])
227            io = iec103.IoAddress(
228                function_type=int(etype_suffix[2]),
229                information_number=int(etype_suffix[3]))
230            self._process_command(event, address, asdu, io)
231
232        elif etype_suffix[0] == 'interrogation':
233            asdu = int(etype_suffix[1])
234            self._process_interrogation(event, address, asdu)
235
236        else:
237            raise Exception('unexpected event type')
238
239    def _process_enable(self, event):
240        address = self._address_from_event(event)
241        enable = event.payload.data
242        if address not in self._remote_enabled:
243            raise Exception('invalid remote device address')
244
245        if not isinstance(enable, bool):
246            raise Exception('invalid enable event payload')
247
248        self._remote_enabled[address] = enable
249
250        if not self._master:
251            return
252
253        if enable:
254            self._enable_remote(address)
255
256        else:
257            self._disable_remote(address)
258
259    def _enable_remote(self, address):
260        remote_group = self._remote_groups.get(address)
261        if remote_group and remote_group.is_open:
262            return
263
264        remote_group = self._async_group.create_subgroup()
265        self._remote_groups[address] = remote_group
266        remote_group.spawn(self._connection_loop, remote_group, address)
267
268    def _disable_remote(self, address):
269        if address in self._remote_groups:
270            remote_group = self._remote_groups.pop(address)
271            remote_group.close()
272
273    def _process_command(self, event, address, asdu, io):
274        conn = self._conns.get(address)
275        if not conn or not conn.is_open:
276            raise Exception('connection closed')
277
278        value = iec103.DoubleValue[event.payload.data['value']]
279        session_id = event.payload.data['session_id']
280        self._remote_groups[address].spawn(
281            self._cmd_req_res, conn, address, asdu, io, value, session_id)
282
283    async def _cmd_req_res(self, conn, address, asdu, io, value, session_id):
284        try:
285            success = await asyncio.wait_for(
286                conn.send_command(asdu, io, value), timeout=command_timeout)
287
288        except ConnectionError:
289            mlog.warning('command %s %s %s to %s failed: connection closed',
290                         asdu, io, value, address)
291            return
292
293        except asyncio.TimeoutError:
294            mlog.warning(
295                'command %s %s %s to %s timeout', asdu, io, value, address)
296            return
297
298        event = _create_event(
299            event_type=(*self._event_type_prefix, 'gateway', 'remote_device',
300                        str(address), 'command', str(asdu),
301                        str(io.function_type), str(io.information_number)),
302            payload={'success': success,
303                     'session_id': session_id})
304
305        await self._eventer_client.register([event])
306
307    def _process_interrogation(self, event, address, asdu):
308        conn = self._conns.get(address)
309        if not conn or not conn.is_open:
310            mlog.warning("event %s ignored due to connection closed", event)
311            return
312
313        self._remote_groups[address].spawn(
314            self._interrogate_req_res, conn, address, asdu)
315
316    async def _interrogate_req_res(self, conn, address, asdu):
317        try:
318            await asyncio.wait_for(conn.interrogate(asdu),
319                                   timeout=interrogate_timeout)
320
321        except ConnectionError:
322            mlog.warning('interrogation on %s to %s failed: connection closed',
323                         asdu, address)
324            return
325
326        except asyncio.TimeoutError:
327            mlog.warning('interrogation on %s to %s timeout', asdu, address)
328            return
329
330        event = _create_event(
331            event_type=(*self._event_type_prefix, 'gateway', 'remote_device',
332                        str(address), 'interrogation', str(asdu)),
333            payload=None)
334
335        await self._eventer_client.register([event])
336
337    async def _register_status(self, status):
338        event = _create_event(
339            event_type=(*self._event_type_prefix, 'gateway', 'status'),
340            payload=status)
341
342        await self._eventer_client.register([event])
343
344    async def _register_rmt_status(self, address, status):
345        event = _create_event(
346            event_type=(*self._event_type_prefix,
347                        'gateway', 'remote_device', str(address), 'status'),
348            payload=status)
349
350        await self._eventer_client.register([event])
351
352    def _address_from_event(self, event):
353        return int(event.type[len(self._event_type_prefix) + 2])

Device interface

Iec103MasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
62    def __init__(self,
63                 conf: common.DeviceConf,
64                 eventer_client: hat.event.eventer.Client,
65                 event_type_prefix: common.EventTypePrefix):
66        self._conf = conf
67        self._eventer_client = eventer_client
68        self._event_type_prefix = event_type_prefix
69        self._master = None
70        self._conns = {}
71        self._remote_enabled = {i['address']: False
72                                for i in conf['remote_devices']}
73        self._remote_confs = {i['address']: i
74                              for i in conf['remote_devices']}
75        self._remote_groups = {}
76        self._async_group = aio.Group()
77
78        self.async_group.spawn(self._create_link_master_loop)
async_group: hat.aio.group.Group
80    @property
81    def async_group(self) -> aio.Group:
82        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
84    async def process_events(self, events: Collection[hat.event.common.Event]):
85        for event in events:
86            try:
87                await self._process_event(event)
88
89            except Exception as e:
90                mlog.warning('error processing event: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.