hat.gateway.devices.iec104.slave

IEC 60870-5-104 slave device

  1"""IEC 60870-5-104 slave device"""
  2
  3from collections.abc import Collection
  4import contextlib
  5import functools
  6import itertools
  7import logging
  8
  9from hat import aio
 10from hat.drivers import iec104
 11from hat.drivers import tcp
 12import hat.event.common
 13import hat.event.eventer
 14
 15from hat.gateway.devices.iec101 import slave as iec101_slave
 16from hat.gateway.devices.iec104 import common
 17from hat.gateway.devices.iec104 import ssl
 18
 19
 20mlog: logging.Logger = logging.getLogger(__name__)
 21
 22
 23async def create(conf: common.DeviceConf,
 24                 eventer_client: hat.event.eventer.Client,
 25                 event_type_prefix: common.EventTypePrefix
 26                 ) -> 'Iec104SlaveDevice':
 27    device = Iec104SlaveDevice()
 28    device._conf = conf
 29    device._eventer_client = eventer_client
 30    device._event_type_prefix = event_type_prefix
 31    device._max_connections = conf['max_connections']
 32    device._next_conn_ids = itertools.count(1)
 33    device._conns = {}
 34    device._remote_hosts = (set(conf['remote_hosts'])
 35                            if conf['remote_hosts'] is not None else None)
 36    device._buffers = {}
 37    device._data_msgs = {}
 38    device._data_buffers = {}
 39
 40    iec101_slave.init_buffers(buffers_conf=conf['buffers'],
 41                              buffers=device._buffers)
 42
 43    await iec101_slave.init_data(data_conf=conf['data'],
 44                                 data_msgs=device._data_msgs,
 45                                 data_buffers=device._data_buffers,
 46                                 buffers=device._buffers,
 47                                 eventer_client=eventer_client,
 48                                 event_type_prefix=event_type_prefix)
 49
 50    ssl_ctx = (ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_SERVER)
 51               if conf['security'] else None)
 52
 53    device._srv = await iec104.listen(
 54        connection_cb=device._on_connection,
 55        addr=tcp.Address(host=conf['local_host'],
 56                         port=conf['local_port']),
 57        response_timeout=conf['response_timeout'],
 58        supervisory_timeout=conf['supervisory_timeout'],
 59        test_timeout=conf['test_timeout'],
 60        send_window_size=conf['send_window_size'],
 61        receive_window_size=conf['receive_window_size'],
 62        ssl=ssl_ctx)
 63
 64    try:
 65        await device._register_connections()
 66
 67    except BaseException:
 68        await aio.uncancellable(device.async_close())
 69        raise
 70
 71    return device
 72
 73
 74info: common.DeviceInfo = common.DeviceInfo(
 75    type="iec104_slave",
 76    create=create,
 77    json_schema_id="hat-gateway://iec104.yaml#/$defs/slave",
 78    json_schema_repo=common.json_schema_repo)
 79
 80
 81class Iec104SlaveDevice(common.Device):
 82
 83    @property
 84    def async_group(self) -> aio.Group:
 85        return self._srv.async_group
 86
 87    async def process_events(self, events: Collection[hat.event.common.Event]):
 88        for event in events:
 89            try:
 90                mlog.debug('received event: %s', event)
 91                await self._process_event(event)
 92
 93            except Exception as e:
 94                mlog.warning('error processing event: %s', e, exc_info=e)
 95
 96    async def _on_connection(self, conn):
 97        if (self._max_connections is not None and
 98                len(self._conns) >= self._max_connections):
 99            mlog.info('max connections exceeded - rejecting connection')
100            conn.close()
101            return
102
103        if self._conf['security']:
104            try:
105                ssl.init_security(self._conf['security'], conn)
106
107            except Exception as e:
108                mlog.error('init security error: %s', exc_info=e)
109                conn.close()
110                return
111
112        conn_id = next(self._next_conn_ids)
113
114        try:
115            if self._remote_hosts is not None:
116                remote_host = conn.info.remote_addr.host
117                if remote_host not in self._remote_hosts:
118                    raise Exception(f'remote host {remote_host} not allowed')
119
120            self._conns[conn_id] = conn
121            await self._register_connections()
122
123            enabled_cb = functools.partial(self._on_enabled, conn)
124            with conn.register_enabled_cb(enabled_cb):
125                enabled_cb(conn.is_enabled)
126
127                while True:
128                    msgs = await conn.receive()
129
130                    for msg in msgs:
131                        try:
132                            mlog.debug('received message: %s', msg)
133                            await self._process_msg(conn_id, conn, msg)
134
135                        except Exception as e:
136                            mlog.warning('error processing message: %s',
137                                         e, exc_info=e)
138
139        except ConnectionError:
140            mlog.debug('connection close')
141
142        except Exception as e:
143            mlog.warning('connection error: %s', e, exc_info=e)
144
145        finally:
146            mlog.debug('closing connection')
147            conn.close()
148
149            with contextlib.suppress(Exception):
150                self._conns.pop(conn_id)
151                await aio.uncancellable(self._register_connections())
152
153    def _on_enabled(self, conn, enabled):
154        if not enabled:
155            return
156
157        with contextlib.suppress(Exception):
158            for buffer in self._buffers.values():
159                for event_id, data_msg in buffer.get_event_id_data_msgs():
160                    self._send_data_msg(conn, buffer, event_id, data_msg)
161
162    async def _register_connections(self):
163        payload = [{'connection_id': conn_id,
164                    'local': {'host': conn.info.local_addr.host,
165                              'port': conn.info.local_addr.port},
166                    'remote': {'host': conn.info.remote_addr.host,
167                               'port': conn.info.remote_addr.port}}
168                   for conn_id, conn in self._conns.items()]
169
170        event = hat.event.common.RegisterEvent(
171            type=(*self._event_type_prefix, 'gateway', 'connections'),
172            source_timestamp=None,
173            payload=hat.event.common.EventPayloadJson(payload))
174
175        await self._eventer_client.register([event])
176
177    async def _process_event(self, event):
178        suffix = event.type[len(self._event_type_prefix):]
179
180        if suffix[:2] == ('system', 'data'):
181            data_type_str, asdu_address_str, io_address_str = suffix[2:]
182            data_key = common.DataKey(data_type=common.DataType(data_type_str),
183                                      asdu_address=int(asdu_address_str),
184                                      io_address=int(io_address_str))
185
186            await self._process_data_event(data_key, event)
187
188        elif suffix[:2] == ('system', 'command'):
189            cmd_type_str, asdu_address_str, io_address_str = suffix[2:]
190            cmd_key = common.CommandKey(
191                cmd_type=common.CommandType(cmd_type_str),
192                asdu_address=int(asdu_address_str),
193                io_address=int(io_address_str))
194
195            await self._process_command_event(cmd_key, event)
196
197        else:
198            raise Exception('unsupported event type')
199
200    async def _process_data_event(self, data_key, event):
201        if data_key not in self._data_msgs:
202            raise Exception('data not configured')
203
204        data_msg = _data_msg_from_event(data_key, event)
205        self._data_msgs[data_key] = data_msg
206
207        buffer = self._data_buffers.get(data_key)
208        if buffer:
209            buffer.add(event.id, data_msg)
210
211        for conn in self._conns.values():
212            self._send_data_msg(conn, buffer, event.id, data_msg)
213
214    async def _process_command_event(self, cmd_key, event):
215        cmd_msg = _cmd_msg_from_event(cmd_key, event)
216        conn_id = event.payload.data['connection_id']
217        conn = self._conns[conn_id]
218        await conn.send([cmd_msg])
219
220    async def _process_msg(self, conn_id, conn, msg):
221        if isinstance(msg, iec104.CommandMsg):
222            await self._process_command_msg(conn_id, conn, msg)
223
224        elif isinstance(msg, iec104.InterrogationMsg):
225            await self._process_interrogation_msg(conn_id, conn, msg)
226
227        elif isinstance(msg, iec104.CounterInterrogationMsg):
228            await self._process_counter_interrogation_msg(conn_id, conn, msg)
229
230        elif isinstance(msg, iec104.ReadMsg):
231            await self._process_read_msg(conn_id, conn, msg)
232
233        elif isinstance(msg, iec104.ClockSyncMsg):
234            await self._process_clock_sync_msg(conn_id, conn, msg)
235
236        elif isinstance(msg, iec104.TestMsg):
237            await self._process_test_msg(conn_id, conn, msg)
238
239        elif isinstance(msg, iec104.ResetMsg):
240            await self._process_reset_msg(conn_id, conn, msg)
241
242        elif isinstance(msg, iec104.ParameterMsg):
243            await self._process_parameter_msg(conn_id, conn, msg)
244
245        elif isinstance(msg, iec104.ParameterActivationMsg):
246            await self._process_parameter_activation_msg(conn_id, conn, msg)
247
248        else:
249            raise Exception('unsupported message')
250
251    async def _process_command_msg(self, conn_id, conn, msg):
252        if isinstance(msg.cause, iec104.CommandReqCause):
253            event = _cmd_msg_to_event(self._event_type_prefix, conn_id, msg)
254            await self._eventer_client.register([event])
255
256        else:
257            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
258                               is_negative_confirm=True)
259            await conn.send([res])
260
261    async def _process_interrogation_msg(self, conn_id, conn, msg):
262        if msg.cause == iec104.CommandReqCause.ACTIVATION:
263            res = msg._replace(
264                cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION,
265                is_negative_confirm=False)
266            await conn.send([res])
267
268            data_msgs = [
269                data_msg._replace(
270                    is_test=msg.is_test,
271                    cause=iec104.DataResCause.INTERROGATED_STATION)
272                for data_msg in self._data_msgs.values()
273                if (data_msg and
274                    (msg.asdu_address == 0xFFFF or
275                     msg.asdu_address == data_msg.asdu_address) and
276                    not isinstance(data_msg.data, iec104.BinaryCounterData))]
277            await conn.send(data_msgs)
278
279            res = msg._replace(
280                cause=iec104.CommandResCause.ACTIVATION_TERMINATION,
281                is_negative_confirm=False)
282            await conn.send([res])
283
284        elif msg.cause == iec104.CommandReqCause.DEACTIVATION:
285            res = msg._replace(
286                cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION,
287                is_negative_confirm=True)
288            await conn.send([res])
289
290        else:
291            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
292                               is_negative_confirm=True)
293            await conn.send([res])
294
295    async def _process_counter_interrogation_msg(self, conn_id, conn, msg):
296        if msg.cause == iec104.CommandReqCause.ACTIVATION:
297            res = msg._replace(
298                cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION,
299                is_negative_confirm=False)
300            await conn.send([res])
301
302            data_msgs = [
303                data_msg._replace(
304                    is_test=msg.is_test,
305                    cause=iec104.DataResCause.INTERROGATED_COUNTER)
306                for data_msg in self._data_msgs.values()
307                if (data_msg and
308                    (msg.asdu_address == 0xFFFF or
309                     msg.asdu_address == data_msg.asdu_address) and
310                    isinstance(data_msg.data, iec104.BinaryCounterData))]
311            await conn.send(data_msgs)
312
313            res = msg._replace(
314                cause=iec104.CommandResCause.ACTIVATION_TERMINATION,
315                is_negative_confirm=False)
316            await conn.send([res])
317
318        elif msg.cause == iec104.CommandReqCause.DEACTIVATION:
319            res = msg._replace(
320                cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION,
321                is_negative_confirm=True)
322            await conn.send([res])
323
324        else:
325            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
326                               is_negative_confirm=True)
327            await conn.send([res])
328
329    async def _process_read_msg(self, conn_id, conn, msg):
330        res = msg._replace(cause=iec104.ReadResCause.UNKNOWN_TYPE)
331        await conn.send([res])
332
333    async def _process_clock_sync_msg(self, conn_id, conn, msg):
334        if isinstance(msg.cause, iec104.ClockSyncReqCause):
335            res = msg._replace(
336                cause=iec104.ClockSyncResCause.ACTIVATION_CONFIRMATION,
337                is_negative_confirm=True)
338            await conn.send([res])
339
340        else:
341            res = msg._replace(cause=iec104.ClockSyncResCause.UNKNOWN_CAUSE,
342                               is_negative_confirm=True)
343            await conn.send([res])
344
345    async def _process_test_msg(self, conn_id, conn, msg):
346        res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE)
347        await conn.send([res])
348
349    async def _process_reset_msg(self, conn_id, conn, msg):
350        res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE)
351        await conn.send([res])
352
353    async def _process_parameter_msg(self, conn_id, conn, msg):
354        res = msg._replace(cause=iec104.ParameterResCause.UNKNOWN_TYPE)
355        await conn.send([res])
356
357    async def _process_parameter_activation_msg(self, conn_id, conn, msg):
358        res = msg._replace(
359            cause=iec104.ParameterActivationResCause.UNKNOWN_TYPE)
360        await conn.send([res])
361
362    def _send_data_msg(self, conn, buffer, event_id, data_msg):
363        self.async_group.spawn(_send_data_msg, conn, buffer, event_id,
364                               data_msg)
365
366
367async def _send_data_msg(conn, buffer, event_id, data_msg):
368    try:
369        if buffer:
370            await conn.send([data_msg], wait_ack=True)
371            buffer.remove(event_id)
372
373        else:
374            await conn.send([data_msg])
375
376    except ConnectionError:
377        pass
378
379    except Exception as e:
380        mlog.warning('send data message error: %s', e, exc_info=e)
381
382
383def _cmd_msg_to_event(event_type_prefix, conn_id, msg):
384    event = iec101_slave.cmd_msg_to_event(event_type_prefix, conn_id, msg)
385    source_timestamp = common.time_to_source_timestamp(msg.time)
386    return event._replace(source_timestamp=source_timestamp)
387
388
389def _data_msg_from_event(data_key, event):
390    return iec101_slave.data_msg_from_event(data_key, event)
391
392
393def _cmd_msg_from_event(cmd_key, event):
394    msg = iec101_slave.cmd_msg_from_event(cmd_key, event)
395    time = common.time_from_source_timestamp(event.source_timestamp)
396    return iec104.CommandMsg(**msg._asdict(),
397                             time=time)
mlog: logging.Logger = <Logger hat.gateway.devices.iec104.slave (WARNING)>
async def create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec104SlaveDevice:
24async def create(conf: common.DeviceConf,
25                 eventer_client: hat.event.eventer.Client,
26                 event_type_prefix: common.EventTypePrefix
27                 ) -> 'Iec104SlaveDevice':
28    device = Iec104SlaveDevice()
29    device._conf = conf
30    device._eventer_client = eventer_client
31    device._event_type_prefix = event_type_prefix
32    device._max_connections = conf['max_connections']
33    device._next_conn_ids = itertools.count(1)
34    device._conns = {}
35    device._remote_hosts = (set(conf['remote_hosts'])
36                            if conf['remote_hosts'] is not None else None)
37    device._buffers = {}
38    device._data_msgs = {}
39    device._data_buffers = {}
40
41    iec101_slave.init_buffers(buffers_conf=conf['buffers'],
42                              buffers=device._buffers)
43
44    await iec101_slave.init_data(data_conf=conf['data'],
45                                 data_msgs=device._data_msgs,
46                                 data_buffers=device._data_buffers,
47                                 buffers=device._buffers,
48                                 eventer_client=eventer_client,
49                                 event_type_prefix=event_type_prefix)
50
51    ssl_ctx = (ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_SERVER)
52               if conf['security'] else None)
53
54    device._srv = await iec104.listen(
55        connection_cb=device._on_connection,
56        addr=tcp.Address(host=conf['local_host'],
57                         port=conf['local_port']),
58        response_timeout=conf['response_timeout'],
59        supervisory_timeout=conf['supervisory_timeout'],
60        test_timeout=conf['test_timeout'],
61        send_window_size=conf['send_window_size'],
62        receive_window_size=conf['receive_window_size'],
63        ssl=ssl_ctx)
64
65    try:
66        await device._register_connections()
67
68    except BaseException:
69        await aio.uncancellable(device.async_close())
70        raise
71
72    return device
info: hat.gateway.common.DeviceInfo = DeviceInfo(type='iec104_slave', create=<function create>, json_schema_id='hat-gateway://iec104.yaml#/$defs/slave', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class Iec104SlaveDevice(hat.gateway.common.Device):
 82class Iec104SlaveDevice(common.Device):
 83
 84    @property
 85    def async_group(self) -> aio.Group:
 86        return self._srv.async_group
 87
 88    async def process_events(self, events: Collection[hat.event.common.Event]):
 89        for event in events:
 90            try:
 91                mlog.debug('received event: %s', event)
 92                await self._process_event(event)
 93
 94            except Exception as e:
 95                mlog.warning('error processing event: %s', e, exc_info=e)
 96
 97    async def _on_connection(self, conn):
 98        if (self._max_connections is not None and
 99                len(self._conns) >= self._max_connections):
100            mlog.info('max connections exceeded - rejecting connection')
101            conn.close()
102            return
103
104        if self._conf['security']:
105            try:
106                ssl.init_security(self._conf['security'], conn)
107
108            except Exception as e:
109                mlog.error('init security error: %s', exc_info=e)
110                conn.close()
111                return
112
113        conn_id = next(self._next_conn_ids)
114
115        try:
116            if self._remote_hosts is not None:
117                remote_host = conn.info.remote_addr.host
118                if remote_host not in self._remote_hosts:
119                    raise Exception(f'remote host {remote_host} not allowed')
120
121            self._conns[conn_id] = conn
122            await self._register_connections()
123
124            enabled_cb = functools.partial(self._on_enabled, conn)
125            with conn.register_enabled_cb(enabled_cb):
126                enabled_cb(conn.is_enabled)
127
128                while True:
129                    msgs = await conn.receive()
130
131                    for msg in msgs:
132                        try:
133                            mlog.debug('received message: %s', msg)
134                            await self._process_msg(conn_id, conn, msg)
135
136                        except Exception as e:
137                            mlog.warning('error processing message: %s',
138                                         e, exc_info=e)
139
140        except ConnectionError:
141            mlog.debug('connection close')
142
143        except Exception as e:
144            mlog.warning('connection error: %s', e, exc_info=e)
145
146        finally:
147            mlog.debug('closing connection')
148            conn.close()
149
150            with contextlib.suppress(Exception):
151                self._conns.pop(conn_id)
152                await aio.uncancellable(self._register_connections())
153
154    def _on_enabled(self, conn, enabled):
155        if not enabled:
156            return
157
158        with contextlib.suppress(Exception):
159            for buffer in self._buffers.values():
160                for event_id, data_msg in buffer.get_event_id_data_msgs():
161                    self._send_data_msg(conn, buffer, event_id, data_msg)
162
163    async def _register_connections(self):
164        payload = [{'connection_id': conn_id,
165                    'local': {'host': conn.info.local_addr.host,
166                              'port': conn.info.local_addr.port},
167                    'remote': {'host': conn.info.remote_addr.host,
168                               'port': conn.info.remote_addr.port}}
169                   for conn_id, conn in self._conns.items()]
170
171        event = hat.event.common.RegisterEvent(
172            type=(*self._event_type_prefix, 'gateway', 'connections'),
173            source_timestamp=None,
174            payload=hat.event.common.EventPayloadJson(payload))
175
176        await self._eventer_client.register([event])
177
178    async def _process_event(self, event):
179        suffix = event.type[len(self._event_type_prefix):]
180
181        if suffix[:2] == ('system', 'data'):
182            data_type_str, asdu_address_str, io_address_str = suffix[2:]
183            data_key = common.DataKey(data_type=common.DataType(data_type_str),
184                                      asdu_address=int(asdu_address_str),
185                                      io_address=int(io_address_str))
186
187            await self._process_data_event(data_key, event)
188
189        elif suffix[:2] == ('system', 'command'):
190            cmd_type_str, asdu_address_str, io_address_str = suffix[2:]
191            cmd_key = common.CommandKey(
192                cmd_type=common.CommandType(cmd_type_str),
193                asdu_address=int(asdu_address_str),
194                io_address=int(io_address_str))
195
196            await self._process_command_event(cmd_key, event)
197
198        else:
199            raise Exception('unsupported event type')
200
201    async def _process_data_event(self, data_key, event):
202        if data_key not in self._data_msgs:
203            raise Exception('data not configured')
204
205        data_msg = _data_msg_from_event(data_key, event)
206        self._data_msgs[data_key] = data_msg
207
208        buffer = self._data_buffers.get(data_key)
209        if buffer:
210            buffer.add(event.id, data_msg)
211
212        for conn in self._conns.values():
213            self._send_data_msg(conn, buffer, event.id, data_msg)
214
215    async def _process_command_event(self, cmd_key, event):
216        cmd_msg = _cmd_msg_from_event(cmd_key, event)
217        conn_id = event.payload.data['connection_id']
218        conn = self._conns[conn_id]
219        await conn.send([cmd_msg])
220
221    async def _process_msg(self, conn_id, conn, msg):
222        if isinstance(msg, iec104.CommandMsg):
223            await self._process_command_msg(conn_id, conn, msg)
224
225        elif isinstance(msg, iec104.InterrogationMsg):
226            await self._process_interrogation_msg(conn_id, conn, msg)
227
228        elif isinstance(msg, iec104.CounterInterrogationMsg):
229            await self._process_counter_interrogation_msg(conn_id, conn, msg)
230
231        elif isinstance(msg, iec104.ReadMsg):
232            await self._process_read_msg(conn_id, conn, msg)
233
234        elif isinstance(msg, iec104.ClockSyncMsg):
235            await self._process_clock_sync_msg(conn_id, conn, msg)
236
237        elif isinstance(msg, iec104.TestMsg):
238            await self._process_test_msg(conn_id, conn, msg)
239
240        elif isinstance(msg, iec104.ResetMsg):
241            await self._process_reset_msg(conn_id, conn, msg)
242
243        elif isinstance(msg, iec104.ParameterMsg):
244            await self._process_parameter_msg(conn_id, conn, msg)
245
246        elif isinstance(msg, iec104.ParameterActivationMsg):
247            await self._process_parameter_activation_msg(conn_id, conn, msg)
248
249        else:
250            raise Exception('unsupported message')
251
252    async def _process_command_msg(self, conn_id, conn, msg):
253        if isinstance(msg.cause, iec104.CommandReqCause):
254            event = _cmd_msg_to_event(self._event_type_prefix, conn_id, msg)
255            await self._eventer_client.register([event])
256
257        else:
258            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
259                               is_negative_confirm=True)
260            await conn.send([res])
261
262    async def _process_interrogation_msg(self, conn_id, conn, msg):
263        if msg.cause == iec104.CommandReqCause.ACTIVATION:
264            res = msg._replace(
265                cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION,
266                is_negative_confirm=False)
267            await conn.send([res])
268
269            data_msgs = [
270                data_msg._replace(
271                    is_test=msg.is_test,
272                    cause=iec104.DataResCause.INTERROGATED_STATION)
273                for data_msg in self._data_msgs.values()
274                if (data_msg and
275                    (msg.asdu_address == 0xFFFF or
276                     msg.asdu_address == data_msg.asdu_address) and
277                    not isinstance(data_msg.data, iec104.BinaryCounterData))]
278            await conn.send(data_msgs)
279
280            res = msg._replace(
281                cause=iec104.CommandResCause.ACTIVATION_TERMINATION,
282                is_negative_confirm=False)
283            await conn.send([res])
284
285        elif msg.cause == iec104.CommandReqCause.DEACTIVATION:
286            res = msg._replace(
287                cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION,
288                is_negative_confirm=True)
289            await conn.send([res])
290
291        else:
292            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
293                               is_negative_confirm=True)
294            await conn.send([res])
295
296    async def _process_counter_interrogation_msg(self, conn_id, conn, msg):
297        if msg.cause == iec104.CommandReqCause.ACTIVATION:
298            res = msg._replace(
299                cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION,
300                is_negative_confirm=False)
301            await conn.send([res])
302
303            data_msgs = [
304                data_msg._replace(
305                    is_test=msg.is_test,
306                    cause=iec104.DataResCause.INTERROGATED_COUNTER)
307                for data_msg in self._data_msgs.values()
308                if (data_msg and
309                    (msg.asdu_address == 0xFFFF or
310                     msg.asdu_address == data_msg.asdu_address) and
311                    isinstance(data_msg.data, iec104.BinaryCounterData))]
312            await conn.send(data_msgs)
313
314            res = msg._replace(
315                cause=iec104.CommandResCause.ACTIVATION_TERMINATION,
316                is_negative_confirm=False)
317            await conn.send([res])
318
319        elif msg.cause == iec104.CommandReqCause.DEACTIVATION:
320            res = msg._replace(
321                cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION,
322                is_negative_confirm=True)
323            await conn.send([res])
324
325        else:
326            res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE,
327                               is_negative_confirm=True)
328            await conn.send([res])
329
330    async def _process_read_msg(self, conn_id, conn, msg):
331        res = msg._replace(cause=iec104.ReadResCause.UNKNOWN_TYPE)
332        await conn.send([res])
333
334    async def _process_clock_sync_msg(self, conn_id, conn, msg):
335        if isinstance(msg.cause, iec104.ClockSyncReqCause):
336            res = msg._replace(
337                cause=iec104.ClockSyncResCause.ACTIVATION_CONFIRMATION,
338                is_negative_confirm=True)
339            await conn.send([res])
340
341        else:
342            res = msg._replace(cause=iec104.ClockSyncResCause.UNKNOWN_CAUSE,
343                               is_negative_confirm=True)
344            await conn.send([res])
345
346    async def _process_test_msg(self, conn_id, conn, msg):
347        res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE)
348        await conn.send([res])
349
350    async def _process_reset_msg(self, conn_id, conn, msg):
351        res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE)
352        await conn.send([res])
353
354    async def _process_parameter_msg(self, conn_id, conn, msg):
355        res = msg._replace(cause=iec104.ParameterResCause.UNKNOWN_TYPE)
356        await conn.send([res])
357
358    async def _process_parameter_activation_msg(self, conn_id, conn, msg):
359        res = msg._replace(
360            cause=iec104.ParameterActivationResCause.UNKNOWN_TYPE)
361        await conn.send([res])
362
363    def _send_data_msg(self, conn, buffer, event_id, data_msg):
364        self.async_group.spawn(_send_data_msg, conn, buffer, event_id,
365                               data_msg)

Device interface

async_group: hat.aio.group.Group
84    @property
85    def async_group(self) -> aio.Group:
86        return self._srv.async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
88    async def process_events(self, events: Collection[hat.event.common.Event]):
89        for event in events:
90            try:
91                mlog.debug('received event: %s', event)
92                await self._process_event(event)
93
94            except Exception as e:
95                mlog.warning('error processing event: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.