hat.gateway.devices.iec101.master

IEC 60870-5-101 master device

  1"""IEC 60870-5-101 master device"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import collections
  6import contextlib
  7import datetime
  8import functools
  9import logging
 10
 11from hat import aio
 12from hat.drivers import iec101
 13from hat.drivers import serial
 14from hat.drivers.iec60870 import link
 15import hat.event.common
 16import hat.event.eventer
 17
 18from hat.gateway.devices.iec101 import common
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22
 23
 24async def create(conf: common.DeviceConf,
 25                 eventer_client: hat.event.eventer.Client,
 26                 event_type_prefix: common.EventTypePrefix
 27                 ) -> 'Iec101MasterDevice':
 28    event_types = [(*event_type_prefix, 'system', 'remote_device',
 29                    str(i['address']), 'enable')
 30                   for i in conf['remote_devices']]
 31    params = hat.event.common.QueryLatestParams(event_types)
 32    result = await eventer_client.query(params)
 33
 34    device = Iec101MasterDevice(conf=conf,
 35                                eventer_client=eventer_client,
 36                                event_type_prefix=event_type_prefix)
 37    try:
 38        await device.process_events(result.events)
 39
 40    except BaseException:
 41        await aio.uncancellable(device.async_close())
 42        raise
 43
 44    return device
 45
 46
 47info: common.DeviceInfo = common.DeviceInfo(
 48    type="iec101_master",
 49    create=create,
 50    json_schema_id="hat-gateway://iec101.yaml#/$defs/master",
 51    json_schema_repo=common.json_schema_repo)
 52
 53
 54class Iec101MasterDevice(common.Device):
 55
 56    def __init__(self,
 57                 conf: common.DeviceConf,
 58                 eventer_client: hat.event.eventer.Client,
 59                 event_type_prefix: common.EventTypePrefix,
 60                 send_queue_size: int = 1024):
 61        self._conf = conf
 62        self._event_type_prefix = event_type_prefix
 63        self._eventer_client = eventer_client
 64        self._master = None
 65        self._conns = {}
 66        self._send_queue = aio.Queue(send_queue_size)
 67        self._async_group = aio.Group()
 68        self._remote_enabled = {i['address']: False
 69                                for i in conf['remote_devices']}
 70        self._remote_confs = {i['address']: i
 71                              for i in conf['remote_devices']}
 72        self._remote_groups = {}
 73
 74        self.async_group.spawn(self._create_link_master_loop)
 75        self.async_group.spawn(self._send_loop)
 76
 77    @property
 78    def async_group(self) -> aio.Group:
 79        return self._async_group
 80
 81    async def process_events(self, events: Collection[hat.event.common.Event]):
 82        for event in events:
 83            try:
 84                await self._process_event(event)
 85
 86            except Exception as e:
 87                mlog.warning('error processing event: %s', e, exc_info=e)
 88
 89    async def _create_link_master_loop(self):
 90
 91        async def cleanup():
 92            with contextlib.suppress(ConnectionError):
 93                await self._register_status('DISCONNECTED')
 94
 95            if self._master:
 96                await self._master.async_close()
 97
 98        try:
 99            while True:
100                await self._register_status('CONNECTING')
101
102                try:
103                    self._master = await link.unbalanced.create_master(
104                        port=self._conf['port'],
105                        baudrate=self._conf['baudrate'],
106                        bytesize=serial.ByteSize[self._conf['bytesize']],
107                        parity=serial.Parity[self._conf['parity']],
108                        stopbits=serial.StopBits[self._conf['stopbits']],
109                        xonxoff=self._conf['flow_control']['xonxoff'],
110                        rtscts=self._conf['flow_control']['rtscts'],
111                        dsrdtr=self._conf['flow_control']['dsrdtr'],
112                        silent_interval=self._conf['silent_interval'],
113                        address_size=link.AddressSize[
114                            self._conf['device_address_size']])
115
116                except Exception as e:
117                    mlog.warning('link master (endpoint) failed to create: %s',
118                                 e, exc_info=e)
119                    await self._register_status('DISCONNECTED')
120                    await asyncio.sleep(self._conf['reconnect_delay'])
121                    continue
122
123                await self._register_status('CONNECTED')
124
125                for address, enabled in self._remote_enabled.items():
126                    if enabled:
127                        self._enable_remote(address)
128
129                await self._master.wait_closed()
130                await self._register_status('DISCONNECTED')
131                self._master = None
132
133        except Exception as e:
134            mlog.error('create link master error: %s', e, exc_info=e)
135
136        finally:
137            mlog.debug('closing link master loop')
138            self.close()
139            self._conns = {}
140            await aio.uncancellable(cleanup())
141
142    async def _send_loop(self):
143        while True:
144            msg, address = await self._send_queue.get()
145
146            conn = self._conns.get(address)
147            if not conn or not conn.is_open:
148                mlog.warning('msg %s not sent, connection to %s closed',
149                             msg, address)
150                continue
151
152            try:
153                await conn.send([msg])
154                mlog.debug('msg sent asdu=%s', msg.asdu_address)
155
156            except ConnectionError:
157                mlog.warning('msg %s not sent, connection to %s closed',
158                             msg, address)
159
160    async def _connection_loop(self, group, address):
161
162        async def cleanup():
163            with contextlib.suppress(ConnectionError):
164                await self._register_rmt_status(address, 'DISCONNECTED')
165
166            conn = self._conns.pop(address, None)
167            if conn:
168                await conn.async_close()
169
170        remote_conf = self._remote_confs[address]
171        try:
172            while True:
173                await self._register_rmt_status(address, 'CONNECTING')
174
175                try:
176                    master_conn = await self._master.connect(
177                        addr=address,
178                        response_timeout=remote_conf['response_timeout'],
179                        send_retry_count=remote_conf['send_retry_count'],
180                        poll_class1_delay=remote_conf['poll_class1_delay'],
181                        poll_class2_delay=remote_conf['poll_class2_delay'])
182
183                except Exception as e:
184                    mlog.error('connection error to address %s: %s',
185                               address, e, exc_info=e)
186                    await self._register_rmt_status(address, 'DISCONNECTED')
187                    await asyncio.sleep(remote_conf['reconnect_delay'])
188                    continue
189
190                await self._register_rmt_status(address, 'CONNECTED')
191
192                conn = iec101.MasterConnection(
193                    conn=master_conn,
194                    cause_size=iec101.CauseSize[self._conf['cause_size']],
195                    asdu_address_size=iec101.AsduAddressSize[
196                        self._conf['asdu_address_size']],
197                    io_address_size=iec101.IoAddressSize[
198                        self._conf['io_address_size']])
199                self._conns[address] = conn
200                group.spawn(self._receive_loop, conn, address)
201
202                if remote_conf['time_sync_delay'] is not None:
203                    group.spawn(self._time_sync_loop, conn,
204                                remote_conf['time_sync_delay'])
205
206                await conn.wait_closed()
207                await self._register_rmt_status(address, 'DISCONNECTED')
208                self._conns.pop(address)
209
210        except Exception as e:
211            mlog.error('connection loop error: %s', e, exc_info=e)
212
213        finally:
214            mlog.debug('closing remote device %s', address)
215            group.close()
216            await aio.uncancellable(cleanup())
217
218    async def _receive_loop(self, conn, address):
219        try:
220            while True:
221                try:
222                    msgs = await conn.receive()
223
224                except iec101.AsduTypeError as e:
225                    mlog.warning("asdu type error: %s", e)
226                    continue
227
228                events = collections.deque()
229                for msg in msgs:
230                    if isinstance(msg, iec101.ClockSyncMsg):
231                        continue
232
233                    try:
234                        event = _msg_to_event(self._event_type_prefix, address,
235                                              msg)
236                        events.append(event)
237
238                    except Exception as e:
239                        mlog.warning('message %s ignored due to: %s',
240                                     msg, e, exc_info=e)
241
242                if not events:
243                    continue
244
245                await self._eventer_client.register(events)
246                for e in events:
247                    mlog.debug('registered event %s', e)
248
249        except ConnectionError:
250            mlog.debug('connection closed')
251
252        except Exception as e:
253            mlog.error('receive loop error: %s', e, exc_info=e)
254
255        finally:
256            conn.close()
257
258    async def _time_sync_loop(self, conn, delay):
259        try:
260            while True:
261                time_now = datetime.datetime.now(datetime.timezone.utc)
262                time_iec101 = iec101.time_from_datetime(time_now)
263                msg = iec101.ClockSyncMsg(
264                    is_test=False,
265                    originator_address=0,
266                    asdu_address={
267                        'ONE': 0xFF,
268                        'TWO': 0xFFFF}[self._conf['asdu_address_size']],
269                    time=time_iec101,
270                    is_negative_confirm=False,
271                    cause=iec101.ClockSyncReqCause.ACTIVATION)
272                await conn.send([msg])
273                mlog.debug('time sync sent %s', time_iec101)
274
275                await asyncio.sleep(delay)
276
277        except ConnectionError:
278            mlog.debug('connection closed')
279
280        except Exception as e:
281            mlog.error('time sync loop error: %s', e, exc_info=e)
282
283        finally:
284            conn.close()
285
286    async def _process_event(self, event):
287        match_type = functools.partial(hat.event.common.matches_query_type,
288                                       event.type)
289
290        prefix = (*self._event_type_prefix, 'system', 'remote_device', '?')
291        if not match_type((*prefix, '*')):
292            raise Exception('unexpected event type')
293
294        address = int(event.type[len(prefix) - 1])
295        suffix = event.type[len(prefix):]
296
297        if match_type((*prefix, 'enable')):
298            self._process_event_enable(address, event)
299
300        elif match_type((*prefix, 'command', '?', '?', '?')):
301            cmd_key = common.CommandKey(
302                cmd_type=common.CommandType(suffix[1]),
303                asdu_address=int(suffix[2]),
304                io_address=int(suffix[3]))
305            msg = _command_from_event(cmd_key, event)
306
307            await self._send_queue.put((msg, address))
308            mlog.debug('command asdu=%s io=%s prepared for sending',
309                       cmd_key.asdu_address, cmd_key.io_address)
310
311        elif match_type((*prefix, 'interrogation', '?')):
312            asdu_address = int(suffix[1])
313            msg = _interrogation_from_event(asdu_address, event)
314
315            await self._send_queue.put((msg, address))
316            mlog.debug("interrogation request asdu=%s prepared for sending",
317                       asdu_address)
318
319        elif match_type((*prefix, 'counter_interrogation', '?')):
320            asdu_address = int(suffix[1])
321            msg = _counter_interrogation_from_event(asdu_address, event)
322
323            await self._send_queue.put((msg, address))
324            mlog.debug("counter interrogation request asdu=%s prepared for "
325                       "sending", asdu_address)
326
327        else:
328            raise Exception('unexpected event type')
329
330    def _process_event_enable(self, address, event):
331        if address not in self._remote_enabled:
332            raise Exception('invalid remote device address')
333
334        enable = event.payload.data
335        if not isinstance(enable, bool):
336            raise Exception('invalid enable event payload')
337
338        if address not in self._remote_enabled:
339            mlog.warning('received enable for unexpected remote device')
340            return
341
342        self._remote_enabled[address] = enable
343
344        if not enable:
345            self._disable_remote(address)
346
347        elif not self._master:
348            return
349
350        else:
351            self._enable_remote(address)
352
353    def _enable_remote(self, address):
354        mlog.debug('enabling device %s', address)
355        remote_group = self._remote_groups.get(address)
356        if remote_group and remote_group.is_open:
357            mlog.debug('device %s is already running', address)
358            return
359
360        remote_group = self._async_group.create_subgroup()
361        self._remote_groups[address] = remote_group
362        remote_group.spawn(self._connection_loop, remote_group, address)
363
364    def _disable_remote(self, address):
365        mlog.debug('disabling device %s', address)
366        if address in self._remote_groups:
367            remote_group = self._remote_groups.pop(address)
368            remote_group.close()
369
370    async def _register_status(self, status):
371        event = hat.event.common.RegisterEvent(
372            type=(*self._event_type_prefix, 'gateway', 'status'),
373            source_timestamp=None,
374            payload=hat.event.common.EventPayloadJson(status))
375        await self._eventer_client.register([event])
376        mlog.debug('device status -> %s', status)
377
378    async def _register_rmt_status(self, address, status):
379        event = hat.event.common.RegisterEvent(
380            type=(*self._event_type_prefix, 'gateway', 'remote_device',
381                  str(address), 'status'),
382            source_timestamp=None,
383            payload=hat.event.common.EventPayloadJson(status))
384        await self._eventer_client.register([event])
385        mlog.debug('remote device %s status -> %s', address, status)
386
387
388def _msg_to_event(event_type_prefix, address, msg):
389    if isinstance(msg, iec101.DataMsg):
390        return _data_to_event(event_type_prefix, address, msg)
391
392    if isinstance(msg, iec101.CommandMsg):
393        return _command_to_event(event_type_prefix, address, msg)
394
395    if isinstance(msg, iec101.InterrogationMsg):
396        return _interrogation_to_event(event_type_prefix, address, msg)
397
398    if isinstance(msg, iec101.CounterInterrogationMsg):
399        return _counter_interrogation_to_event(event_type_prefix, address, msg)
400
401    raise Exception('unsupported message type')
402
403
404def _data_to_event(event_type_prefix, address, msg):
405    data_type = common.get_data_type(msg.data)
406    cause = common.cause_to_json(iec101.DataResCause, msg.cause)
407    data = common.data_to_json(msg.data)
408    event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address),
409                  'data', data_type.value, str(msg.asdu_address),
410                  str(msg.io_address))
411    source_timestamp = common.time_to_source_timestamp(msg.time)
412
413    return hat.event.common.RegisterEvent(
414        type=event_type,
415        source_timestamp=source_timestamp,
416        payload=hat.event.common.EventPayloadJson({'is_test': msg.is_test,
417                                                   'cause': cause,
418                                                   'data': data}))
419
420
421def _command_to_event(event_type_prefix, address, msg):
422    command_type = common.get_command_type(msg.command)
423    cause = common.cause_to_json(iec101.CommandResCause, msg.cause)
424    command = common.command_to_json(msg.command)
425    event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address),
426                  'command', command_type.value, str(msg.asdu_address),
427                  str(msg.io_address))
428
429    return hat.event.common.RegisterEvent(
430        type=event_type,
431        source_timestamp=None,
432        payload=hat.event.common.EventPayloadJson({
433            'is_test': msg.is_test,
434            'is_negative_confirm': msg.is_negative_confirm,
435            'cause': cause,
436            'command': command}))
437
438
439def _interrogation_to_event(event_type_prefix, address, msg):
440    cause = common.cause_to_json(iec101.CommandResCause, msg.cause)
441    event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address),
442                  'interrogation', str(msg.asdu_address))
443
444    return hat.event.common.RegisterEvent(
445        type=event_type,
446        source_timestamp=None,
447        payload=hat.event.common.EventPayloadJson({
448            'is_test': msg.is_test,
449            'is_negative_confirm': msg.is_negative_confirm,
450            'request': msg.request,
451            'cause': cause}))
452
453
454def _counter_interrogation_to_event(event_type_prefix, address, msg):
455    cause = common.cause_to_json(iec101.CommandResCause, msg.cause)
456    event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address),
457                  'counter_interrogation', str(msg.asdu_address))
458
459    return hat.event.common.RegisterEvent(
460        type=event_type,
461        source_timestamp=None,
462        payload=hat.event.common.EventPayloadJson({
463            'is_test': msg.is_test,
464            'is_negative_confirm': msg.is_negative_confirm,
465            'request': msg.request,
466            'freeze': msg.freeze.name,
467            'cause': cause}))
468
469
470def _command_from_event(cmd_key, event):
471    cause = common.cause_from_json(iec101.CommandReqCause,
472                                   event.payload.data['cause'])
473    command = common.command_from_json(cmd_key.cmd_type,
474                                       event.payload.data['command'])
475
476    return iec101.CommandMsg(is_test=event.payload.data['is_test'],
477                             originator_address=0,
478                             asdu_address=cmd_key.asdu_address,
479                             io_address=cmd_key.io_address,
480                             command=command,
481                             is_negative_confirm=False,
482                             cause=cause)
483
484
485def _interrogation_from_event(asdu_address, event):
486    cause = common.cause_from_json(iec101.CommandReqCause,
487                                   event.payload.data['cause'])
488
489    return iec101.InterrogationMsg(is_test=event.payload.data['is_test'],
490                                   originator_address=0,
491                                   asdu_address=asdu_address,
492                                   request=event.payload.data['request'],
493                                   is_negative_confirm=False,
494                                   cause=cause)
495
496
497def _counter_interrogation_from_event(asdu_address, event):
498    freeze = iec101.FreezeCode[event.payload.data['freeze']]
499    cause = common.cause_from_json(iec101.CommandReqCause,
500                                   event.payload.data['cause'])
501
502    return iec101.CounterInterrogationMsg(
503        is_test=event.payload.data['is_test'],
504        originator_address=0,
505        asdu_address=asdu_address,
506        request=event.payload.data['request'],
507        freeze=freeze,
508        is_negative_confirm=False,
509        cause=cause)
mlog: logging.Logger = <Logger hat.gateway.devices.iec101.master (WARNING)>
async def create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec101MasterDevice:
25async def create(conf: common.DeviceConf,
26                 eventer_client: hat.event.eventer.Client,
27                 event_type_prefix: common.EventTypePrefix
28                 ) -> 'Iec101MasterDevice':
29    event_types = [(*event_type_prefix, 'system', 'remote_device',
30                    str(i['address']), 'enable')
31                   for i in conf['remote_devices']]
32    params = hat.event.common.QueryLatestParams(event_types)
33    result = await eventer_client.query(params)
34
35    device = Iec101MasterDevice(conf=conf,
36                                eventer_client=eventer_client,
37                                event_type_prefix=event_type_prefix)
38    try:
39        await device.process_events(result.events)
40
41    except BaseException:
42        await aio.uncancellable(device.async_close())
43        raise
44
45    return device
info: hat.gateway.common.DeviceInfo = DeviceInfo(type='iec101_master', create=<function create>, json_schema_id='hat-gateway://iec101.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class Iec101MasterDevice(hat.gateway.common.Device):
 55class Iec101MasterDevice(common.Device):
 56
 57    def __init__(self,
 58                 conf: common.DeviceConf,
 59                 eventer_client: hat.event.eventer.Client,
 60                 event_type_prefix: common.EventTypePrefix,
 61                 send_queue_size: int = 1024):
 62        self._conf = conf
 63        self._event_type_prefix = event_type_prefix
 64        self._eventer_client = eventer_client
 65        self._master = None
 66        self._conns = {}
 67        self._send_queue = aio.Queue(send_queue_size)
 68        self._async_group = aio.Group()
 69        self._remote_enabled = {i['address']: False
 70                                for i in conf['remote_devices']}
 71        self._remote_confs = {i['address']: i
 72                              for i in conf['remote_devices']}
 73        self._remote_groups = {}
 74
 75        self.async_group.spawn(self._create_link_master_loop)
 76        self.async_group.spawn(self._send_loop)
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        return self._async_group
 81
 82    async def process_events(self, events: Collection[hat.event.common.Event]):
 83        for event in events:
 84            try:
 85                await self._process_event(event)
 86
 87            except Exception as e:
 88                mlog.warning('error processing event: %s', e, exc_info=e)
 89
 90    async def _create_link_master_loop(self):
 91
 92        async def cleanup():
 93            with contextlib.suppress(ConnectionError):
 94                await self._register_status('DISCONNECTED')
 95
 96            if self._master:
 97                await self._master.async_close()
 98
 99        try:
100            while True:
101                await self._register_status('CONNECTING')
102
103                try:
104                    self._master = await link.unbalanced.create_master(
105                        port=self._conf['port'],
106                        baudrate=self._conf['baudrate'],
107                        bytesize=serial.ByteSize[self._conf['bytesize']],
108                        parity=serial.Parity[self._conf['parity']],
109                        stopbits=serial.StopBits[self._conf['stopbits']],
110                        xonxoff=self._conf['flow_control']['xonxoff'],
111                        rtscts=self._conf['flow_control']['rtscts'],
112                        dsrdtr=self._conf['flow_control']['dsrdtr'],
113                        silent_interval=self._conf['silent_interval'],
114                        address_size=link.AddressSize[
115                            self._conf['device_address_size']])
116
117                except Exception as e:
118                    mlog.warning('link master (endpoint) failed to create: %s',
119                                 e, exc_info=e)
120                    await self._register_status('DISCONNECTED')
121                    await asyncio.sleep(self._conf['reconnect_delay'])
122                    continue
123
124                await self._register_status('CONNECTED')
125
126                for address, enabled in self._remote_enabled.items():
127                    if enabled:
128                        self._enable_remote(address)
129
130                await self._master.wait_closed()
131                await self._register_status('DISCONNECTED')
132                self._master = None
133
134        except Exception as e:
135            mlog.error('create link master error: %s', e, exc_info=e)
136
137        finally:
138            mlog.debug('closing link master loop')
139            self.close()
140            self._conns = {}
141            await aio.uncancellable(cleanup())
142
143    async def _send_loop(self):
144        while True:
145            msg, address = await self._send_queue.get()
146
147            conn = self._conns.get(address)
148            if not conn or not conn.is_open:
149                mlog.warning('msg %s not sent, connection to %s closed',
150                             msg, address)
151                continue
152
153            try:
154                await conn.send([msg])
155                mlog.debug('msg sent asdu=%s', msg.asdu_address)
156
157            except ConnectionError:
158                mlog.warning('msg %s not sent, connection to %s closed',
159                             msg, address)
160
161    async def _connection_loop(self, group, address):
162
163        async def cleanup():
164            with contextlib.suppress(ConnectionError):
165                await self._register_rmt_status(address, 'DISCONNECTED')
166
167            conn = self._conns.pop(address, None)
168            if conn:
169                await conn.async_close()
170
171        remote_conf = self._remote_confs[address]
172        try:
173            while True:
174                await self._register_rmt_status(address, 'CONNECTING')
175
176                try:
177                    master_conn = await self._master.connect(
178                        addr=address,
179                        response_timeout=remote_conf['response_timeout'],
180                        send_retry_count=remote_conf['send_retry_count'],
181                        poll_class1_delay=remote_conf['poll_class1_delay'],
182                        poll_class2_delay=remote_conf['poll_class2_delay'])
183
184                except Exception as e:
185                    mlog.error('connection error to address %s: %s',
186                               address, e, exc_info=e)
187                    await self._register_rmt_status(address, 'DISCONNECTED')
188                    await asyncio.sleep(remote_conf['reconnect_delay'])
189                    continue
190
191                await self._register_rmt_status(address, 'CONNECTED')
192
193                conn = iec101.MasterConnection(
194                    conn=master_conn,
195                    cause_size=iec101.CauseSize[self._conf['cause_size']],
196                    asdu_address_size=iec101.AsduAddressSize[
197                        self._conf['asdu_address_size']],
198                    io_address_size=iec101.IoAddressSize[
199                        self._conf['io_address_size']])
200                self._conns[address] = conn
201                group.spawn(self._receive_loop, conn, address)
202
203                if remote_conf['time_sync_delay'] is not None:
204                    group.spawn(self._time_sync_loop, conn,
205                                remote_conf['time_sync_delay'])
206
207                await conn.wait_closed()
208                await self._register_rmt_status(address, 'DISCONNECTED')
209                self._conns.pop(address)
210
211        except Exception as e:
212            mlog.error('connection loop error: %s', e, exc_info=e)
213
214        finally:
215            mlog.debug('closing remote device %s', address)
216            group.close()
217            await aio.uncancellable(cleanup())
218
219    async def _receive_loop(self, conn, address):
220        try:
221            while True:
222                try:
223                    msgs = await conn.receive()
224
225                except iec101.AsduTypeError as e:
226                    mlog.warning("asdu type error: %s", e)
227                    continue
228
229                events = collections.deque()
230                for msg in msgs:
231                    if isinstance(msg, iec101.ClockSyncMsg):
232                        continue
233
234                    try:
235                        event = _msg_to_event(self._event_type_prefix, address,
236                                              msg)
237                        events.append(event)
238
239                    except Exception as e:
240                        mlog.warning('message %s ignored due to: %s',
241                                     msg, e, exc_info=e)
242
243                if not events:
244                    continue
245
246                await self._eventer_client.register(events)
247                for e in events:
248                    mlog.debug('registered event %s', e)
249
250        except ConnectionError:
251            mlog.debug('connection closed')
252
253        except Exception as e:
254            mlog.error('receive loop error: %s', e, exc_info=e)
255
256        finally:
257            conn.close()
258
259    async def _time_sync_loop(self, conn, delay):
260        try:
261            while True:
262                time_now = datetime.datetime.now(datetime.timezone.utc)
263                time_iec101 = iec101.time_from_datetime(time_now)
264                msg = iec101.ClockSyncMsg(
265                    is_test=False,
266                    originator_address=0,
267                    asdu_address={
268                        'ONE': 0xFF,
269                        'TWO': 0xFFFF}[self._conf['asdu_address_size']],
270                    time=time_iec101,
271                    is_negative_confirm=False,
272                    cause=iec101.ClockSyncReqCause.ACTIVATION)
273                await conn.send([msg])
274                mlog.debug('time sync sent %s', time_iec101)
275
276                await asyncio.sleep(delay)
277
278        except ConnectionError:
279            mlog.debug('connection closed')
280
281        except Exception as e:
282            mlog.error('time sync loop error: %s', e, exc_info=e)
283
284        finally:
285            conn.close()
286
287    async def _process_event(self, event):
288        match_type = functools.partial(hat.event.common.matches_query_type,
289                                       event.type)
290
291        prefix = (*self._event_type_prefix, 'system', 'remote_device', '?')
292        if not match_type((*prefix, '*')):
293            raise Exception('unexpected event type')
294
295        address = int(event.type[len(prefix) - 1])
296        suffix = event.type[len(prefix):]
297
298        if match_type((*prefix, 'enable')):
299            self._process_event_enable(address, event)
300
301        elif match_type((*prefix, 'command', '?', '?', '?')):
302            cmd_key = common.CommandKey(
303                cmd_type=common.CommandType(suffix[1]),
304                asdu_address=int(suffix[2]),
305                io_address=int(suffix[3]))
306            msg = _command_from_event(cmd_key, event)
307
308            await self._send_queue.put((msg, address))
309            mlog.debug('command asdu=%s io=%s prepared for sending',
310                       cmd_key.asdu_address, cmd_key.io_address)
311
312        elif match_type((*prefix, 'interrogation', '?')):
313            asdu_address = int(suffix[1])
314            msg = _interrogation_from_event(asdu_address, event)
315
316            await self._send_queue.put((msg, address))
317            mlog.debug("interrogation request asdu=%s prepared for sending",
318                       asdu_address)
319
320        elif match_type((*prefix, 'counter_interrogation', '?')):
321            asdu_address = int(suffix[1])
322            msg = _counter_interrogation_from_event(asdu_address, event)
323
324            await self._send_queue.put((msg, address))
325            mlog.debug("counter interrogation request asdu=%s prepared for "
326                       "sending", asdu_address)
327
328        else:
329            raise Exception('unexpected event type')
330
331    def _process_event_enable(self, address, event):
332        if address not in self._remote_enabled:
333            raise Exception('invalid remote device address')
334
335        enable = event.payload.data
336        if not isinstance(enable, bool):
337            raise Exception('invalid enable event payload')
338
339        if address not in self._remote_enabled:
340            mlog.warning('received enable for unexpected remote device')
341            return
342
343        self._remote_enabled[address] = enable
344
345        if not enable:
346            self._disable_remote(address)
347
348        elif not self._master:
349            return
350
351        else:
352            self._enable_remote(address)
353
354    def _enable_remote(self, address):
355        mlog.debug('enabling device %s', address)
356        remote_group = self._remote_groups.get(address)
357        if remote_group and remote_group.is_open:
358            mlog.debug('device %s is already running', address)
359            return
360
361        remote_group = self._async_group.create_subgroup()
362        self._remote_groups[address] = remote_group
363        remote_group.spawn(self._connection_loop, remote_group, address)
364
365    def _disable_remote(self, address):
366        mlog.debug('disabling device %s', address)
367        if address in self._remote_groups:
368            remote_group = self._remote_groups.pop(address)
369            remote_group.close()
370
371    async def _register_status(self, status):
372        event = hat.event.common.RegisterEvent(
373            type=(*self._event_type_prefix, 'gateway', 'status'),
374            source_timestamp=None,
375            payload=hat.event.common.EventPayloadJson(status))
376        await self._eventer_client.register([event])
377        mlog.debug('device status -> %s', status)
378
379    async def _register_rmt_status(self, address, status):
380        event = hat.event.common.RegisterEvent(
381            type=(*self._event_type_prefix, 'gateway', 'remote_device',
382                  str(address), 'status'),
383            source_timestamp=None,
384            payload=hat.event.common.EventPayloadJson(status))
385        await self._eventer_client.register([event])
386        mlog.debug('remote device %s status -> %s', address, status)

Device interface

Iec101MasterDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], send_queue_size: int = 1024)
57    def __init__(self,
58                 conf: common.DeviceConf,
59                 eventer_client: hat.event.eventer.Client,
60                 event_type_prefix: common.EventTypePrefix,
61                 send_queue_size: int = 1024):
62        self._conf = conf
63        self._event_type_prefix = event_type_prefix
64        self._eventer_client = eventer_client
65        self._master = None
66        self._conns = {}
67        self._send_queue = aio.Queue(send_queue_size)
68        self._async_group = aio.Group()
69        self._remote_enabled = {i['address']: False
70                                for i in conf['remote_devices']}
71        self._remote_confs = {i['address']: i
72                              for i in conf['remote_devices']}
73        self._remote_groups = {}
74
75        self.async_group.spawn(self._create_link_master_loop)
76        self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
82    async def process_events(self, events: Collection[hat.event.common.Event]):
83        for event in events:
84            try:
85                await self._process_event(event)
86
87            except Exception as e:
88                mlog.warning('error processing event: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.