hat.gateway.devices.iec101.slave

IEC 60870-5-104 slave device

  1"""IEC 60870-5-104 slave device"""
  2
  3from collections.abc import Collection, Iterable
  4import collections
  5import contextlib
  6import functools
  7import itertools
  8import logging
  9
 10from hat import aio
 11from hat import json
 12from hat.drivers import iec101
 13from hat.drivers import serial
 14from hat.drivers.iec60870 import link
 15import hat.event.common
 16import hat.event.eventer
 17
 18from hat.gateway.devices.iec101 import common
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22
 23
 24async def create(conf: common.DeviceConf,
 25                 eventer_client: hat.event.eventer.Client,
 26                 event_type_prefix: common.EventTypePrefix
 27                 ) -> 'Iec101SlaveDevice':
 28    device = Iec101SlaveDevice()
 29    device._conf = conf
 30    device._eventer_client = eventer_client
 31    device._event_type_prefix = event_type_prefix
 32    device._next_conn_ids = itertools.count(1)
 33    device._conns = {}
 34    device._buffers = {}
 35    device._data_msgs = {}
 36    device._data_buffers = {}
 37
 38    init_buffers(buffers_conf=conf['buffers'],
 39                 buffers=device._buffers)
 40
 41    await init_data(data_conf=conf['data'],
 42                    data_msgs=device._data_msgs,
 43                    data_buffers=device._data_buffers,
 44                    buffers=device._buffers,
 45                    eventer_client=eventer_client,
 46                    event_type_prefix=event_type_prefix)
 47
 48    device._slave = await link.unbalanced.create_slave(
 49        port=conf['port'],
 50        addrs=conf['addresses'],
 51        connection_cb=device._on_connection,
 52        baudrate=conf['baudrate'],
 53        bytesize=serial.ByteSize[conf['bytesize']],
 54        parity=serial.Parity[conf['parity']],
 55        stopbits=serial.StopBits[conf['stopbits']],
 56        xonxoff=conf['flow_control']['xonxoff'],
 57        rtscts=conf['flow_control']['rtscts'],
 58        dsrdtr=conf['flow_control']['dsrdtr'],
 59        silent_interval=conf['silent_interval'],
 60        address_size=link.AddressSize[conf['device_address_size']],
 61        keep_alive_timeout=conf['keep_alive_timeout'])
 62
 63    try:
 64        await device._register_connections()
 65
 66    except BaseException:
 67        await aio.uncancellable(device.async_close())
 68        raise
 69
 70    return device
 71
 72
 73info: common.DeviceInfo = common.DeviceInfo(
 74    type="iec101_slave",
 75    create=create,
 76    json_schema_id="hat-gateway://iec101.yaml#/$defs/slave",
 77    json_schema_repo=common.json_schema_repo)
 78
 79
 80class Buffer:
 81
 82    def __init__(self, size: int):
 83        self._size = size
 84        self._data = collections.OrderedDict()
 85
 86    def add(self,
 87            event_id: hat.event.common.EventId,
 88            data_msg: iec101.DataMsg):
 89        self._data[event_id] = data_msg
 90        while len(self._data) > self._size:
 91            self._data.popitem(last=False)
 92
 93    def remove(self, event_id: hat.event.common.EventId):
 94        self._data.pop(event_id, None)
 95
 96    def get_event_id_data_msgs(self) -> Iterable[tuple[hat.event.common.EventId,  # NOQA
 97                                                       iec101.DataMsg]]:
 98        return self._data.items()
 99
100
101def init_buffers(buffers_conf: json.Data,
102                 buffers: dict[str, Buffer]):
103    for buffer_conf in buffers_conf:
104        buffers[buffer_conf['name']] = Buffer(buffer_conf['size'])
105
106
107async def init_data(data_conf: json.Data,
108                    data_msgs: dict[common.DataKey, iec101.DataMsg],
109                    data_buffers: dict[common.DataKey, Buffer],
110                    buffers: dict[str, Buffer],
111                    eventer_client: hat.event.eventer.Client,
112                    event_type_prefix: common.EventTypePrefix):
113    for data in data_conf:
114        data_key = common.DataKey(data_type=common.DataType[data['data_type']],
115                                  asdu_address=data['asdu_address'],
116                                  io_address=data['io_address'])
117        data_msgs[data_key] = None
118        if data['buffer']:
119            data_buffers[data_key] = buffers[data['buffer']]
120
121    event_types = [(*event_type_prefix, 'system', 'data', '*')]
122    params = hat.event.common.QueryLatestParams(event_types)
123    result = await eventer_client.query(params)
124
125    for event in result.events:
126        try:
127            data_type_str, asdu_address_str, io_address_str = \
128                event.type[len(event_type_prefix)+2:]
129            data_key = common.DataKey(data_type=common.DataType(data_type_str),
130                                      asdu_address=int(asdu_address_str),
131                                      io_address=int(io_address_str))
132            if data_key not in data_msgs:
133                raise Exception(f'data {data_key} not configured')
134
135            data_msgs[data_key] = data_msg_from_event(data_key, event)
136
137        except Exception as e:
138            mlog.debug('skipping initial data: %s', e, exc_info=e)
139
140
141class Iec101SlaveDevice(common.Device):
142
143    @property
144    def async_group(self) -> aio.Group:
145        return self._slave.async_group
146
147    async def process_events(self, events: Collection[hat.event.common.Event]):
148        for event in events:
149            try:
150                await self._process_event(event)
151
152            except Exception as e:
153                mlog.warning('error processing event: %s', e, exc_info=e)
154
155    def _on_connection(self, conn):
156        self.async_group.spawn(self._connection_loop, conn)
157
158    async def _connection_loop(self, conn):
159        conn_id = next(self._next_conn_ids)
160
161        try:
162            conn = iec101.SlaveConnection(
163                conn=conn,
164                cause_size=iec101.CauseSize[self._conf['cause_size']],
165                asdu_address_size=iec101.AsduAddressSize[self._conf['asdu_address_size']],  # NOQA
166                io_address_size=iec101.IoAddressSize[self._conf['io_address_size']])  # NOQA
167
168            self._conns[conn_id] = conn
169            await self._register_connections()
170
171            with contextlib.suppress(Exception):
172                for buffer in self._buffers.values():
173                    for event_id, data_msg in buffer.get_event_id_data_msgs():
174                        self._send_data_msg(conn, buffer, event_id, data_msg)
175
176            while True:
177                msgs = await conn.receive()
178
179                for msg in msgs:
180                    try:
181                        mlog.debug('received message: %s', msg)
182                        await self._process_msg(conn_id, conn, msg)
183
184                    except Exception as e:
185                        mlog.warning('error processing message: %s',
186                                     e, exc_info=e)
187
188        except ConnectionError:
189            mlog.debug('connection close')
190
191        except Exception as e:
192            mlog.warning('connection error: %s', e, exc_info=e)
193
194        finally:
195            mlog.debug('closing connection')
196            conn.close()
197
198            with contextlib.suppress(Exception):
199                self._conns.pop(conn_id)
200                await aio.uncancellable(self._register_connections())
201
202    async def _register_connections(self):
203        payload = [{'connection_id': conn_id,
204                    'address': conn.address}
205                   for conn_id, conn in self._conns.items()]
206
207        event = hat.event.common.RegisterEvent(
208            type=(*self._event_type_prefix, 'gateway', 'connections'),
209            source_timestamp=None,
210            payload=hat.event.common.EventPayloadJson(payload))
211
212        await self._eventer_client.register([event])
213
214    async def _process_event(self, event):
215        suffix = event.type[len(self._event_type_prefix):]
216
217        if suffix[:2] == ('system', 'data'):
218            data_type_str, asdu_address_str, io_address_str = suffix[2:]
219            data_key = common.DataKey(data_type=common.DataType(data_type_str),
220                                      asdu_address=int(asdu_address_str),
221                                      io_address=int(io_address_str))
222
223            await self._process_data_event(data_key, event)
224
225        elif suffix[:2] == ('system', 'command'):
226            cmd_type_str, asdu_address_str, io_address_str = suffix[2:]
227            cmd_key = common.CommandKey(
228                cmd_type=common.CommandType(cmd_type_str),
229                asdu_address=int(asdu_address_str),
230                io_address=int(io_address_str))
231
232            await self._process_command_event(cmd_key, event)
233
234        else:
235            raise Exception('unsupported event type')
236
237    async def _process_data_event(self, data_key, event):
238        if data_key not in self._data_msgs:
239            raise Exception('data not configured')
240
241        data_msg = data_msg_from_event(data_key, event)
242        self._data_msgs[data_key] = data_msg
243
244        buffer = self._data_buffers.get(data_key)
245        if buffer:
246            buffer.add(event.id, data_msg)
247
248        for conn in self._conns.values():
249            self._send_data_msg(conn, buffer, event.id, data_msg)
250
251    async def _process_command_event(self, cmd_key, event):
252        cmd_msg = cmd_msg_from_event(cmd_key, event)
253        conn_id = event.payload.data['connection_id']
254        conn = self._conns[conn_id]
255        conn.send([cmd_msg])
256
257    async def _process_msg(self, conn_id, conn, msg):
258        if isinstance(msg, iec101.CommandMsg):
259            await self._process_command_msg(conn_id, conn, msg)
260
261        elif isinstance(msg, iec101.InterrogationMsg):
262            self._process_interrogation_msg(conn_id, conn, msg)
263
264        elif isinstance(msg, iec101.CounterInterrogationMsg):
265            self._process_counter_interrogation_msg(conn_id, conn, msg)
266
267        elif isinstance(msg, iec101.ReadMsg):
268            self._process_read_msg(conn_id, conn, msg)
269
270        elif isinstance(msg, iec101.ClockSyncMsg):
271            self._process_clock_sync_msg(conn_id, conn, msg)
272
273        elif isinstance(msg, iec101.TestMsg):
274            self._process_test_msg(conn_id, conn, msg)
275
276        elif isinstance(msg, iec101.ResetMsg):
277            self._process_reset_msg(conn_id, conn, msg)
278
279        elif isinstance(msg, iec101.ParameterMsg):
280            self._process_parameter_msg(conn_id, conn, msg)
281
282        elif isinstance(msg, iec101.ParameterActivationMsg):
283            self._process_parameter_activation_msg(conn_id, conn, msg)
284
285        else:
286            raise Exception('unsupported message')
287
288    async def _process_command_msg(self, conn_id, conn, msg):
289        if isinstance(msg.cause, iec101.CommandReqCause):
290            event = cmd_msg_to_event(self._event_type_prefix, conn_id, msg)
291            await self._eventer_client.register([event])
292
293        else:
294            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
295                               is_negative_confirm=True)
296            conn.send([res])
297
298    def _process_interrogation_msg(self, conn_id, conn, msg):
299        if msg.cause == iec101.CommandReqCause.ACTIVATION:
300            res = msg._replace(
301                cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION,
302                is_negative_confirm=False)
303            conn.send([res])
304
305            data_msgs = [
306                data_msg._replace(
307                    is_test=msg.is_test,
308                    cause=iec101.DataResCause.INTERROGATED_STATION)
309                for data_msg in self._data_msgs.values()
310                if (data_msg and
311                    (msg.asdu_address == 0xFFFF or
312                     msg.asdu_address == data_msg.asdu_address) and
313                    not isinstance(data_msg.data, iec101.BinaryCounterData))]
314            conn.send(data_msgs)
315
316            res = msg._replace(
317                cause=iec101.CommandResCause.ACTIVATION_TERMINATION,
318                is_negative_confirm=False)
319            conn.send([res])
320
321        elif msg.cause == iec101.CommandReqCause.DEACTIVATION:
322            res = msg._replace(
323                cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION,
324                is_negative_confirm=True)
325            conn.send([res])
326
327        else:
328            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
329                               is_negative_confirm=True)
330            conn.send([res])
331
332    def _process_counter_interrogation_msg(self, conn_id, conn, msg):
333        if msg.cause == iec101.CommandReqCause.ACTIVATION:
334            res = msg._replace(
335                cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION,
336                is_negative_confirm=False)
337            conn.send([res])
338
339            data_msgs = [
340                data_msg._replace(
341                    is_test=msg.is_test,
342                    cause=iec101.DataResCause.INTERROGATED_COUNTER)
343                for data_msg in self._data_msgs.values()
344                if (data_msg and
345                    (msg.asdu_address == 0xFFFF or
346                     msg.asdu_address == data_msg.asdu_address) and
347                    isinstance(data_msg.data, iec101.BinaryCounterData))]
348            conn.send(data_msgs)
349
350            res = msg._replace(
351                cause=iec101.CommandResCause.ACTIVATION_TERMINATION,
352                is_negative_confirm=False)
353            conn.send([res])
354
355        elif msg.cause == iec101.CommandReqCause.DEACTIVATION:
356            res = msg._replace(
357                cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION,
358                is_negative_confirm=True)
359            conn.send([res])
360
361        else:
362            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
363                               is_negative_confirm=True)
364            conn.send([res])
365
366    def _process_read_msg(self, conn_id, conn, msg):
367        res = msg._replace(cause=iec101.ReadResCause.UNKNOWN_TYPE)
368        conn.send([res])
369
370    def _process_clock_sync_msg(self, conn_id, conn, msg):
371        if isinstance(msg.cause, iec101.ClockSyncReqCause):
372            res = msg._replace(
373                cause=iec101.ClockSyncResCause.ACTIVATION_CONFIRMATION,
374                is_negative_confirm=True)
375            conn.send([res])
376
377        else:
378            res = msg._replace(cause=iec101.ClockSyncResCause.UNKNOWN_CAUSE,
379                               is_negative_confirm=True)
380            conn.send([res])
381
382    def _process_test_msg(self, conn_id, conn, msg):
383        res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE)
384        conn.send([res])
385
386    def _process_reset_msg(self, conn_id, conn, msg):
387        res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE)
388        conn.send([res])
389
390    def _process_parameter_msg(self, conn_id, conn, msg):
391        res = msg._replace(cause=iec101.ParameterResCause.UNKNOWN_TYPE)
392        conn.send([res])
393
394    def _process_parameter_activation_msg(self, conn_id, conn, msg):
395        res = msg._replace(
396            cause=iec101.ParameterActivationResCause.UNKNOWN_TYPE)
397        conn.send([res])
398
399    def _send_data_msg(self, conn, buffer, event_id, data_msg):
400        sent_cb = (functools.partial(buffer.remove, event_id)
401                   if buffer else None)
402        conn.send([data_msg], sent_cb=sent_cb)
403
404
405def cmd_msg_to_event(event_type_prefix: hat.event.common.EventType,
406                     conn_id: int,
407                     msg: iec101.CommandMsg
408                     ) -> hat.event.common.RegisterEvent:
409    command_type = common.get_command_type(msg.command)
410    cause = common.cause_to_json(iec101.CommandReqCause, msg.cause)
411    command = common.command_to_json(msg.command)
412    event_type = (*event_type_prefix, 'gateway', 'command', command_type.value,
413                  str(msg.asdu_address), str(msg.io_address))
414
415    return hat.event.common.RegisterEvent(
416        type=event_type,
417        source_timestamp=None,
418        payload=hat.event.common.EventPayloadJson({
419            'connection_id': conn_id,
420            'is_test': msg.is_test,
421            'cause': cause,
422            'command': command}))
423
424
425def data_msg_from_event(data_key: common.DataKey,
426                        event: hat.event.common.Event
427                        ) -> iec101.DataMsg:
428    time = common.time_from_source_timestamp(event.source_timestamp)
429    cause = common.cause_from_json(iec101.DataResCause,
430                                   event.payload.data['cause'])
431    data = common.data_from_json(data_key.data_type,
432                                 event.payload.data['data'])
433
434    return iec101.DataMsg(is_test=event.payload.data['is_test'],
435                          originator_address=0,
436                          asdu_address=data_key.asdu_address,
437                          io_address=data_key.io_address,
438                          data=data,
439                          time=time,
440                          cause=cause)
441
442
443def cmd_msg_from_event(cmd_key: common.CommandKey,
444                       event: hat.event.common.Event
445                       ) -> iec101.CommandMsg:
446    cause = common.cause_from_json(iec101.CommandResCause,
447                                   event.payload.data['cause'])
448    command = common.command_from_json(cmd_key.cmd_type,
449                                       event.payload.data['command'])
450    is_negative_confirm = event.payload.data['is_negative_confirm']
451
452    return iec101.CommandMsg(is_test=event.payload.data['is_test'],
453                             originator_address=0,
454                             asdu_address=cmd_key.asdu_address,
455                             io_address=cmd_key.io_address,
456                             command=command,
457                             is_negative_confirm=is_negative_confirm,
458                             cause=cause)
mlog: logging.Logger = <Logger hat.gateway.devices.iec101.slave (WARNING)>
async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec101SlaveDevice:
25async def create(conf: common.DeviceConf,
26                 eventer_client: hat.event.eventer.Client,
27                 event_type_prefix: common.EventTypePrefix
28                 ) -> 'Iec101SlaveDevice':
29    device = Iec101SlaveDevice()
30    device._conf = conf
31    device._eventer_client = eventer_client
32    device._event_type_prefix = event_type_prefix
33    device._next_conn_ids = itertools.count(1)
34    device._conns = {}
35    device._buffers = {}
36    device._data_msgs = {}
37    device._data_buffers = {}
38
39    init_buffers(buffers_conf=conf['buffers'],
40                 buffers=device._buffers)
41
42    await init_data(data_conf=conf['data'],
43                    data_msgs=device._data_msgs,
44                    data_buffers=device._data_buffers,
45                    buffers=device._buffers,
46                    eventer_client=eventer_client,
47                    event_type_prefix=event_type_prefix)
48
49    device._slave = await link.unbalanced.create_slave(
50        port=conf['port'],
51        addrs=conf['addresses'],
52        connection_cb=device._on_connection,
53        baudrate=conf['baudrate'],
54        bytesize=serial.ByteSize[conf['bytesize']],
55        parity=serial.Parity[conf['parity']],
56        stopbits=serial.StopBits[conf['stopbits']],
57        xonxoff=conf['flow_control']['xonxoff'],
58        rtscts=conf['flow_control']['rtscts'],
59        dsrdtr=conf['flow_control']['dsrdtr'],
60        silent_interval=conf['silent_interval'],
61        address_size=link.AddressSize[conf['device_address_size']],
62        keep_alive_timeout=conf['keep_alive_timeout'])
63
64    try:
65        await device._register_connections()
66
67    except BaseException:
68        await aio.uncancellable(device.async_close())
69        raise
70
71    return device
info: hat.gateway.common.DeviceInfo = DeviceInfo(type='iec101_slave', create=<function create>, json_schema_id='hat-gateway://iec101.yaml#/$defs/slave', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class Buffer:
81class Buffer:
82
83    def __init__(self, size: int):
84        self._size = size
85        self._data = collections.OrderedDict()
86
87    def add(self,
88            event_id: hat.event.common.EventId,
89            data_msg: iec101.DataMsg):
90        self._data[event_id] = data_msg
91        while len(self._data) > self._size:
92            self._data.popitem(last=False)
93
94    def remove(self, event_id: hat.event.common.EventId):
95        self._data.pop(event_id, None)
96
97    def get_event_id_data_msgs(self) -> Iterable[tuple[hat.event.common.EventId,  # NOQA
98                                                       iec101.DataMsg]]:
99        return self._data.items()
Buffer(size: int)
83    def __init__(self, size: int):
84        self._size = size
85        self._data = collections.OrderedDict()
def add( self, event_id: hat.event.common.common.EventId, data_msg: hat.drivers.iec101.common.DataMsg):
87    def add(self,
88            event_id: hat.event.common.EventId,
89            data_msg: iec101.DataMsg):
90        self._data[event_id] = data_msg
91        while len(self._data) > self._size:
92            self._data.popitem(last=False)
def remove(self, event_id: hat.event.common.common.EventId):
94    def remove(self, event_id: hat.event.common.EventId):
95        self._data.pop(event_id, None)
def get_event_id_data_msgs( self) -> Iterable[tuple[hat.event.common.common.EventId, hat.drivers.iec101.common.DataMsg]]:
97    def get_event_id_data_msgs(self) -> Iterable[tuple[hat.event.common.EventId,  # NOQA
98                                                       iec101.DataMsg]]:
99        return self._data.items()
def init_buffers( buffers_conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], buffers: dict[str, Buffer]):
102def init_buffers(buffers_conf: json.Data,
103                 buffers: dict[str, Buffer]):
104    for buffer_conf in buffers_conf:
105        buffers[buffer_conf['name']] = Buffer(buffer_conf['size'])
async def init_data( data_conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], data_msgs: dict[hat.gateway.devices.iec101.common.DataKey, hat.drivers.iec101.common.DataMsg], data_buffers: dict[hat.gateway.devices.iec101.common.DataKey, Buffer], buffers: dict[str, Buffer], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]):
108async def init_data(data_conf: json.Data,
109                    data_msgs: dict[common.DataKey, iec101.DataMsg],
110                    data_buffers: dict[common.DataKey, Buffer],
111                    buffers: dict[str, Buffer],
112                    eventer_client: hat.event.eventer.Client,
113                    event_type_prefix: common.EventTypePrefix):
114    for data in data_conf:
115        data_key = common.DataKey(data_type=common.DataType[data['data_type']],
116                                  asdu_address=data['asdu_address'],
117                                  io_address=data['io_address'])
118        data_msgs[data_key] = None
119        if data['buffer']:
120            data_buffers[data_key] = buffers[data['buffer']]
121
122    event_types = [(*event_type_prefix, 'system', 'data', '*')]
123    params = hat.event.common.QueryLatestParams(event_types)
124    result = await eventer_client.query(params)
125
126    for event in result.events:
127        try:
128            data_type_str, asdu_address_str, io_address_str = \
129                event.type[len(event_type_prefix)+2:]
130            data_key = common.DataKey(data_type=common.DataType(data_type_str),
131                                      asdu_address=int(asdu_address_str),
132                                      io_address=int(io_address_str))
133            if data_key not in data_msgs:
134                raise Exception(f'data {data_key} not configured')
135
136            data_msgs[data_key] = data_msg_from_event(data_key, event)
137
138        except Exception as e:
139            mlog.debug('skipping initial data: %s', e, exc_info=e)
class Iec101SlaveDevice(hat.gateway.common.Device):
142class Iec101SlaveDevice(common.Device):
143
144    @property
145    def async_group(self) -> aio.Group:
146        return self._slave.async_group
147
148    async def process_events(self, events: Collection[hat.event.common.Event]):
149        for event in events:
150            try:
151                await self._process_event(event)
152
153            except Exception as e:
154                mlog.warning('error processing event: %s', e, exc_info=e)
155
156    def _on_connection(self, conn):
157        self.async_group.spawn(self._connection_loop, conn)
158
159    async def _connection_loop(self, conn):
160        conn_id = next(self._next_conn_ids)
161
162        try:
163            conn = iec101.SlaveConnection(
164                conn=conn,
165                cause_size=iec101.CauseSize[self._conf['cause_size']],
166                asdu_address_size=iec101.AsduAddressSize[self._conf['asdu_address_size']],  # NOQA
167                io_address_size=iec101.IoAddressSize[self._conf['io_address_size']])  # NOQA
168
169            self._conns[conn_id] = conn
170            await self._register_connections()
171
172            with contextlib.suppress(Exception):
173                for buffer in self._buffers.values():
174                    for event_id, data_msg in buffer.get_event_id_data_msgs():
175                        self._send_data_msg(conn, buffer, event_id, data_msg)
176
177            while True:
178                msgs = await conn.receive()
179
180                for msg in msgs:
181                    try:
182                        mlog.debug('received message: %s', msg)
183                        await self._process_msg(conn_id, conn, msg)
184
185                    except Exception as e:
186                        mlog.warning('error processing message: %s',
187                                     e, exc_info=e)
188
189        except ConnectionError:
190            mlog.debug('connection close')
191
192        except Exception as e:
193            mlog.warning('connection error: %s', e, exc_info=e)
194
195        finally:
196            mlog.debug('closing connection')
197            conn.close()
198
199            with contextlib.suppress(Exception):
200                self._conns.pop(conn_id)
201                await aio.uncancellable(self._register_connections())
202
203    async def _register_connections(self):
204        payload = [{'connection_id': conn_id,
205                    'address': conn.address}
206                   for conn_id, conn in self._conns.items()]
207
208        event = hat.event.common.RegisterEvent(
209            type=(*self._event_type_prefix, 'gateway', 'connections'),
210            source_timestamp=None,
211            payload=hat.event.common.EventPayloadJson(payload))
212
213        await self._eventer_client.register([event])
214
215    async def _process_event(self, event):
216        suffix = event.type[len(self._event_type_prefix):]
217
218        if suffix[:2] == ('system', 'data'):
219            data_type_str, asdu_address_str, io_address_str = suffix[2:]
220            data_key = common.DataKey(data_type=common.DataType(data_type_str),
221                                      asdu_address=int(asdu_address_str),
222                                      io_address=int(io_address_str))
223
224            await self._process_data_event(data_key, event)
225
226        elif suffix[:2] == ('system', 'command'):
227            cmd_type_str, asdu_address_str, io_address_str = suffix[2:]
228            cmd_key = common.CommandKey(
229                cmd_type=common.CommandType(cmd_type_str),
230                asdu_address=int(asdu_address_str),
231                io_address=int(io_address_str))
232
233            await self._process_command_event(cmd_key, event)
234
235        else:
236            raise Exception('unsupported event type')
237
238    async def _process_data_event(self, data_key, event):
239        if data_key not in self._data_msgs:
240            raise Exception('data not configured')
241
242        data_msg = data_msg_from_event(data_key, event)
243        self._data_msgs[data_key] = data_msg
244
245        buffer = self._data_buffers.get(data_key)
246        if buffer:
247            buffer.add(event.id, data_msg)
248
249        for conn in self._conns.values():
250            self._send_data_msg(conn, buffer, event.id, data_msg)
251
252    async def _process_command_event(self, cmd_key, event):
253        cmd_msg = cmd_msg_from_event(cmd_key, event)
254        conn_id = event.payload.data['connection_id']
255        conn = self._conns[conn_id]
256        conn.send([cmd_msg])
257
258    async def _process_msg(self, conn_id, conn, msg):
259        if isinstance(msg, iec101.CommandMsg):
260            await self._process_command_msg(conn_id, conn, msg)
261
262        elif isinstance(msg, iec101.InterrogationMsg):
263            self._process_interrogation_msg(conn_id, conn, msg)
264
265        elif isinstance(msg, iec101.CounterInterrogationMsg):
266            self._process_counter_interrogation_msg(conn_id, conn, msg)
267
268        elif isinstance(msg, iec101.ReadMsg):
269            self._process_read_msg(conn_id, conn, msg)
270
271        elif isinstance(msg, iec101.ClockSyncMsg):
272            self._process_clock_sync_msg(conn_id, conn, msg)
273
274        elif isinstance(msg, iec101.TestMsg):
275            self._process_test_msg(conn_id, conn, msg)
276
277        elif isinstance(msg, iec101.ResetMsg):
278            self._process_reset_msg(conn_id, conn, msg)
279
280        elif isinstance(msg, iec101.ParameterMsg):
281            self._process_parameter_msg(conn_id, conn, msg)
282
283        elif isinstance(msg, iec101.ParameterActivationMsg):
284            self._process_parameter_activation_msg(conn_id, conn, msg)
285
286        else:
287            raise Exception('unsupported message')
288
289    async def _process_command_msg(self, conn_id, conn, msg):
290        if isinstance(msg.cause, iec101.CommandReqCause):
291            event = cmd_msg_to_event(self._event_type_prefix, conn_id, msg)
292            await self._eventer_client.register([event])
293
294        else:
295            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
296                               is_negative_confirm=True)
297            conn.send([res])
298
299    def _process_interrogation_msg(self, conn_id, conn, msg):
300        if msg.cause == iec101.CommandReqCause.ACTIVATION:
301            res = msg._replace(
302                cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION,
303                is_negative_confirm=False)
304            conn.send([res])
305
306            data_msgs = [
307                data_msg._replace(
308                    is_test=msg.is_test,
309                    cause=iec101.DataResCause.INTERROGATED_STATION)
310                for data_msg in self._data_msgs.values()
311                if (data_msg and
312                    (msg.asdu_address == 0xFFFF or
313                     msg.asdu_address == data_msg.asdu_address) and
314                    not isinstance(data_msg.data, iec101.BinaryCounterData))]
315            conn.send(data_msgs)
316
317            res = msg._replace(
318                cause=iec101.CommandResCause.ACTIVATION_TERMINATION,
319                is_negative_confirm=False)
320            conn.send([res])
321
322        elif msg.cause == iec101.CommandReqCause.DEACTIVATION:
323            res = msg._replace(
324                cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION,
325                is_negative_confirm=True)
326            conn.send([res])
327
328        else:
329            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
330                               is_negative_confirm=True)
331            conn.send([res])
332
333    def _process_counter_interrogation_msg(self, conn_id, conn, msg):
334        if msg.cause == iec101.CommandReqCause.ACTIVATION:
335            res = msg._replace(
336                cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION,
337                is_negative_confirm=False)
338            conn.send([res])
339
340            data_msgs = [
341                data_msg._replace(
342                    is_test=msg.is_test,
343                    cause=iec101.DataResCause.INTERROGATED_COUNTER)
344                for data_msg in self._data_msgs.values()
345                if (data_msg and
346                    (msg.asdu_address == 0xFFFF or
347                     msg.asdu_address == data_msg.asdu_address) and
348                    isinstance(data_msg.data, iec101.BinaryCounterData))]
349            conn.send(data_msgs)
350
351            res = msg._replace(
352                cause=iec101.CommandResCause.ACTIVATION_TERMINATION,
353                is_negative_confirm=False)
354            conn.send([res])
355
356        elif msg.cause == iec101.CommandReqCause.DEACTIVATION:
357            res = msg._replace(
358                cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION,
359                is_negative_confirm=True)
360            conn.send([res])
361
362        else:
363            res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE,
364                               is_negative_confirm=True)
365            conn.send([res])
366
367    def _process_read_msg(self, conn_id, conn, msg):
368        res = msg._replace(cause=iec101.ReadResCause.UNKNOWN_TYPE)
369        conn.send([res])
370
371    def _process_clock_sync_msg(self, conn_id, conn, msg):
372        if isinstance(msg.cause, iec101.ClockSyncReqCause):
373            res = msg._replace(
374                cause=iec101.ClockSyncResCause.ACTIVATION_CONFIRMATION,
375                is_negative_confirm=True)
376            conn.send([res])
377
378        else:
379            res = msg._replace(cause=iec101.ClockSyncResCause.UNKNOWN_CAUSE,
380                               is_negative_confirm=True)
381            conn.send([res])
382
383    def _process_test_msg(self, conn_id, conn, msg):
384        res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE)
385        conn.send([res])
386
387    def _process_reset_msg(self, conn_id, conn, msg):
388        res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE)
389        conn.send([res])
390
391    def _process_parameter_msg(self, conn_id, conn, msg):
392        res = msg._replace(cause=iec101.ParameterResCause.UNKNOWN_TYPE)
393        conn.send([res])
394
395    def _process_parameter_activation_msg(self, conn_id, conn, msg):
396        res = msg._replace(
397            cause=iec101.ParameterActivationResCause.UNKNOWN_TYPE)
398        conn.send([res])
399
400    def _send_data_msg(self, conn, buffer, event_id, data_msg):
401        sent_cb = (functools.partial(buffer.remove, event_id)
402                   if buffer else None)
403        conn.send([data_msg], sent_cb=sent_cb)

Device interface

async_group: hat.aio.group.Group
144    @property
145    def async_group(self) -> aio.Group:
146        return self._slave.async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
148    async def process_events(self, events: Collection[hat.event.common.Event]):
149        for event in events:
150            try:
151                await self._process_event(event)
152
153            except Exception as e:
154                mlog.warning('error processing event: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.

def cmd_msg_to_event( event_type_prefix: tuple[str, ...], conn_id: int, msg: hat.drivers.iec101.common.CommandMsg) -> hat.event.common.common.RegisterEvent:
406def cmd_msg_to_event(event_type_prefix: hat.event.common.EventType,
407                     conn_id: int,
408                     msg: iec101.CommandMsg
409                     ) -> hat.event.common.RegisterEvent:
410    command_type = common.get_command_type(msg.command)
411    cause = common.cause_to_json(iec101.CommandReqCause, msg.cause)
412    command = common.command_to_json(msg.command)
413    event_type = (*event_type_prefix, 'gateway', 'command', command_type.value,
414                  str(msg.asdu_address), str(msg.io_address))
415
416    return hat.event.common.RegisterEvent(
417        type=event_type,
418        source_timestamp=None,
419        payload=hat.event.common.EventPayloadJson({
420            'connection_id': conn_id,
421            'is_test': msg.is_test,
422            'cause': cause,
423            'command': command}))
def data_msg_from_event( data_key: hat.gateway.devices.iec101.common.DataKey, event: hat.event.common.common.Event) -> hat.drivers.iec101.common.DataMsg:
426def data_msg_from_event(data_key: common.DataKey,
427                        event: hat.event.common.Event
428                        ) -> iec101.DataMsg:
429    time = common.time_from_source_timestamp(event.source_timestamp)
430    cause = common.cause_from_json(iec101.DataResCause,
431                                   event.payload.data['cause'])
432    data = common.data_from_json(data_key.data_type,
433                                 event.payload.data['data'])
434
435    return iec101.DataMsg(is_test=event.payload.data['is_test'],
436                          originator_address=0,
437                          asdu_address=data_key.asdu_address,
438                          io_address=data_key.io_address,
439                          data=data,
440                          time=time,
441                          cause=cause)
def cmd_msg_from_event( cmd_key: hat.gateway.devices.iec101.common.CommandKey, event: hat.event.common.common.Event) -> hat.drivers.iec101.common.CommandMsg:
444def cmd_msg_from_event(cmd_key: common.CommandKey,
445                       event: hat.event.common.Event
446                       ) -> iec101.CommandMsg:
447    cause = common.cause_from_json(iec101.CommandResCause,
448                                   event.payload.data['cause'])
449    command = common.command_from_json(cmd_key.cmd_type,
450                                       event.payload.data['command'])
451    is_negative_confirm = event.payload.data['is_negative_confirm']
452
453    return iec101.CommandMsg(is_test=event.payload.data['is_test'],
454                             originator_address=0,
455                             asdu_address=cmd_key.asdu_address,
456                             io_address=cmd_key.io_address,
457                             command=command,
458                             is_negative_confirm=is_negative_confirm,
459                             cause=cause)