hat.gateway.devices.iec104.master

IEC 60870-5-104 master device

  1"""IEC 60870-5-104 master device"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import collections
  6import contextlib
  7import datetime
  8import enum
  9import logging
 10
 11from hat import aio
 12from hat.drivers import iec104
 13from hat.drivers import tcp
 14import hat.event.common
 15import hat.event.eventer
 16
 17from hat.gateway.devices.iec104 import common
 18from hat.gateway.devices.iec104 import ssl
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22
 23
 24class Iec104MasterDevice(common.Device):
 25
 26    def __init__(self,
 27                 conf: common.DeviceConf,
 28                 eventer_client: hat.event.eventer.Client,
 29                 event_type_prefix: common.EventTypePrefix):
 30        self._eventer_client = eventer_client
 31        self._event_type_prefix = event_type_prefix
 32        self._conn = None
 33        self._async_group = aio.Group()
 34
 35        ssl_ctx = (
 36            ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT)
 37            if conf['security'] else None)
 38
 39        self.async_group.spawn(self._connection_loop, conf, ssl_ctx)
 40
 41    @property
 42    def async_group(self) -> aio.Group:
 43        return self._async_group
 44
 45    async def process_events(self, events: Collection[hat.event.common.Event]):
 46        msgs = collections.deque()
 47        for event in events:
 48            try:
 49                mlog.debug('received event: %s', event)
 50                msg = _msg_from_event(self._event_type_prefix, event)
 51                msgs.append(msg)
 52
 53            except Exception as e:
 54                mlog.warning('error processing event: %s',
 55                             e, exc_info=e)
 56                continue
 57
 58        if not msgs:
 59            return
 60
 61        if not self._conn or not self._conn.is_open:
 62            mlog.warning('connection closed: %s events ignored',
 63                         len(msgs))
 64            return
 65
 66        try:
 67            await self._conn.send(msgs)
 68            mlog.debug('%s messages sent', len(msgs))
 69
 70        except ConnectionError as e:
 71            mlog.warning('error sending messages: %s', e, exc_info=e)
 72
 73    async def _connection_loop(self, conf, ssl_ctx):
 74
 75        async def cleanup():
 76            with contextlib.suppress(ConnectionError):
 77                await self._register_status('DISCONNECTED')
 78
 79            if self._conn:
 80                await self._conn.async_close()
 81
 82        try:
 83            while True:
 84                await self._register_status('CONNECTING')
 85                for address in conf['remote_addresses']:
 86                    try:
 87                        self._conn = await iec104.connect(
 88                            addr=tcp.Address(host=address['host'],
 89                                             port=address['port']),
 90                            response_timeout=conf['response_timeout'],
 91                            supervisory_timeout=conf['supervisory_timeout'],
 92                            test_timeout=conf['test_timeout'],
 93                            send_window_size=conf['send_window_size'],
 94                            receive_window_size=conf['receive_window_size'],
 95                            ssl=ssl_ctx)
 96
 97                        if conf['security']:
 98                            try:
 99                                ssl.init_security(conf['security'], self._conn)
100
101                            except Exception:
102                                await aio.uncancellable(
103                                    self._conn.async_close())
104                                raise
105
106                        break
107
108                    except Exception as e:
109                        mlog.warning('connection failed: %s', e, exc_info=e)
110
111                else:
112                    await self._register_status('DISCONNECTED')
113                    await asyncio.sleep(conf['reconnect_delay'])
114                    continue
115
116                await self._register_status('CONNECTED')
117                self.async_group.spawn(self._receive_loop, self._conn)
118                if conf['time_sync_delay'] is not None:
119                    self.async_group.spawn(self._time_sync_loop, self._conn,
120                                           conf['time_sync_delay'])
121
122                await self._conn.wait_closed()
123                await self._register_status('DISCONNECTED')
124                self._conn = None
125
126        except ConnectionError:
127            pass
128
129        except Exception as e:
130            mlog.error('connection loop error: %s', e, exc_info=e)
131
132        finally:
133            mlog.debug('closing connection loop')
134            self.close()
135            await aio.uncancellable(cleanup())
136
137    async def _receive_loop(self, conn):
138        try:
139            while True:
140                try:
141                    msgs = await conn.receive()
142
143                except iec104.AsduTypeError as e:
144                    mlog.warning("asdu type error: %s", e)
145                    continue
146
147                events = collections.deque()
148                for msg in msgs:
149                    try:
150                        mlog.debug('received message: %s', msg)
151                        if isinstance(msg, iec104.ClockSyncMsg):
152                            continue
153
154                        event = _msg_to_event(self._event_type_prefix, msg)
155                        events.append(event)
156
157                    except Exception as e:
158                        mlog.warning('error processing message: %s',
159                                     e, exc_info=e)
160                        continue
161
162                if not events:
163                    continue
164
165                await self._eventer_client.register(events)
166                mlog.debug('%s events registered', len(events))
167
168        except ConnectionError:
169            mlog.debug('connection closed')
170
171        except Exception as e:
172            mlog.error('receive loop error: %s', e, exc_info=e)
173
174        finally:
175            mlog.debug('closing receive loop')
176            conn.close()
177
178    async def _time_sync_loop(self, conn, time_sync_delay):
179        try:
180            while True:
181                time_now = datetime.datetime.now(datetime.timezone.utc)
182                time_iec104_now = iec104.time_from_datetime(time_now)
183
184                msg = iec104.ClockSyncMsg(
185                    is_test=False,
186                    originator_address=0,
187                    asdu_address=0xFFFF,
188                    time=time_iec104_now,
189                    is_negative_confirm=False,
190                    cause=iec104.ClockSyncReqCause.ACTIVATION)
191                await conn.send([msg])
192
193                await conn.drain()
194                mlog.debug('time sync sent %s', time_iec104_now)
195
196                await asyncio.sleep(time_sync_delay)
197
198        except ConnectionError:
199            mlog.debug('connection closed')
200
201        except Exception as e:
202            mlog.error('time sync loop error: %s', e, exc_info=e)
203
204        finally:
205            mlog.debug('closing time sync loop')
206            conn.close()
207
208    async def _register_status(self, status):
209        event = hat.event.common.RegisterEvent(
210            type=(*self._event_type_prefix, 'gateway', 'status'),
211            source_timestamp=None,
212            payload=hat.event.common.EventPayloadJson(status))
213
214        await self._eventer_client.register([event])
215        mlog.debug('registered status %s', status)
216
217
218info: common.DeviceInfo = common.DeviceInfo(
219    type="iec104_master",
220    create=Iec104MasterDevice,
221    json_schema_id="hat-gateway://iec104.yaml#/$defs/master",
222    json_schema_repo=common.json_schema_repo)
223
224
225def _msg_to_event(event_type_prefix, msg):
226    if isinstance(msg, iec104.DataMsg):
227        return _data_to_event(event_type_prefix, msg)
228
229    if isinstance(msg, iec104.CommandMsg):
230        return _command_to_event(event_type_prefix, msg)
231
232    if isinstance(msg, iec104.InterrogationMsg):
233        return _interrogation_to_event(event_type_prefix, msg)
234
235    if isinstance(msg, iec104.CounterInterrogationMsg):
236        return _counter_interrogation_to_event(event_type_prefix, msg)
237
238    raise Exception('unsupported message type')
239
240
241def _data_to_event(event_type_prefix, msg):
242    data_type = common.get_data_type(msg.data)
243    cause = _cause_to_json(iec104.DataResCause, msg.cause)
244    data = common.data_to_json(msg.data)
245    event_type = (*event_type_prefix, 'gateway', 'data', data_type.value,
246                  str(msg.asdu_address), str(msg.io_address))
247    source_timestamp = common.time_to_source_timestamp(msg.time)
248
249    return hat.event.common.RegisterEvent(
250        type=event_type,
251        source_timestamp=source_timestamp,
252        payload=hat.event.common.EventPayloadJson({
253            'is_test': msg.is_test,
254            'cause': cause,
255            'data': data}))
256
257
258def _command_to_event(event_type_prefix, msg):
259    command_type = common.get_command_type(msg.command)
260    cause = _cause_to_json(iec104.CommandResCause, msg.cause)
261    command = common.command_to_json(msg.command)
262    event_type = (*event_type_prefix, 'gateway', 'command', command_type.value,
263                  str(msg.asdu_address), str(msg.io_address))
264    source_timestamp = common.time_to_source_timestamp(msg.time)
265
266    return hat.event.common.RegisterEvent(
267        type=event_type,
268        source_timestamp=source_timestamp,
269        payload=hat.event.common.EventPayloadJson({
270            'is_test': msg.is_test,
271            'is_negative_confirm': msg.is_negative_confirm,
272            'cause': cause,
273            'command': command}))
274
275
276def _interrogation_to_event(event_type_prefix, msg):
277    cause = _cause_to_json(iec104.CommandResCause, msg.cause)
278    event_type = (*event_type_prefix, 'gateway', 'interrogation',
279                  str(msg.asdu_address))
280
281    return hat.event.common.RegisterEvent(
282        type=event_type,
283        source_timestamp=None,
284        payload=hat.event.common.EventPayloadJson({
285            'is_test': msg.is_test,
286            'is_negative_confirm': msg.is_negative_confirm,
287            'request': msg.request,
288            'cause': cause}))
289
290
291def _counter_interrogation_to_event(event_type_prefix, msg):
292    cause = _cause_to_json(iec104.CommandResCause, msg.cause)
293    event_type = (*event_type_prefix, 'gateway', 'counter_interrogation',
294                  str(msg.asdu_address))
295
296    return hat.event.common.RegisterEvent(
297        type=event_type,
298        source_timestamp=None,
299        payload=hat.event.common.EventPayloadJson({
300            'is_test': msg.is_test,
301            'is_negative_confirm': msg.is_negative_confirm,
302            'request': msg.request,
303            'freeze': msg.freeze.name,
304            'cause': cause}))
305
306
307def _msg_from_event(event_type_prefix, event):
308    suffix = event.type[len(event_type_prefix):]
309
310    if suffix[:2] == ('system', 'command'):
311        cmd_type_str, asdu_address_str, io_address_str = suffix[2:]
312        cmd_key = common.CommandKey(cmd_type=common.CommandType(cmd_type_str),
313                                    asdu_address=int(asdu_address_str),
314                                    io_address=int(io_address_str))
315        return _command_from_event(cmd_key, event)
316
317    if suffix[:2] == ('system', 'interrogation'):
318        asdu_address = int(suffix[2])
319        return _interrogation_from_event(asdu_address, event)
320
321    if suffix[:2] == ('system', 'counter_interrogation'):
322        asdu_address = int(suffix[2])
323        return _counter_interrogation_from_event(asdu_address, event)
324
325    raise Exception('unsupported event type')
326
327
328def _command_from_event(cmd_key, event):
329    time = common.time_from_source_timestamp(event.source_timestamp)
330    cause = _cause_from_json(iec104.CommandReqCause,
331                             event.payload.data['cause'])
332    command = common.command_from_json(cmd_key.cmd_type,
333                                       event.payload.data['command'])
334
335    return iec104.CommandMsg(is_test=event.payload.data['is_test'],
336                             originator_address=0,
337                             asdu_address=cmd_key.asdu_address,
338                             io_address=cmd_key.io_address,
339                             command=command,
340                             is_negative_confirm=False,
341                             time=time,
342                             cause=cause)
343
344
345def _interrogation_from_event(asdu_address, event):
346    cause = _cause_from_json(iec104.CommandReqCause,
347                             event.payload.data['cause'])
348
349    return iec104.InterrogationMsg(is_test=event.payload.data['is_test'],
350                                   originator_address=0,
351                                   asdu_address=asdu_address,
352                                   request=event.payload.data['request'],
353                                   is_negative_confirm=False,
354                                   cause=cause)
355
356
357def _counter_interrogation_from_event(asdu_address, event):
358    freeze = iec104.FreezeCode[event.payload.data['freeze']]
359    cause = _cause_from_json(iec104.CommandReqCause,
360                             event.payload.data['cause'])
361
362    return iec104.CounterInterrogationMsg(
363        is_test=event.payload.data['is_test'],
364        originator_address=0,
365        asdu_address=asdu_address,
366        request=event.payload.data['request'],
367        freeze=freeze,
368        is_negative_confirm=False,
369        cause=cause)
370
371
372def _cause_to_json(cls, cause):
373    return (cause.name if isinstance(cause, cls) else
374            cause.value if isinstance(cause, enum.Enum) else
375            cause)
376
377
378def _cause_from_json(cls, cause):
379    return cls[cause] if isinstance(cause, str) else cause
mlog: logging.Logger = <Logger hat.gateway.devices.iec104.master (WARNING)>
class Iec104MasterDevice(hat.gateway.common.Device):
 25class Iec104MasterDevice(common.Device):
 26
 27    def __init__(self,
 28                 conf: common.DeviceConf,
 29                 eventer_client: hat.event.eventer.Client,
 30                 event_type_prefix: common.EventTypePrefix):
 31        self._eventer_client = eventer_client
 32        self._event_type_prefix = event_type_prefix
 33        self._conn = None
 34        self._async_group = aio.Group()
 35
 36        ssl_ctx = (
 37            ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT)
 38            if conf['security'] else None)
 39
 40        self.async_group.spawn(self._connection_loop, conf, ssl_ctx)
 41
 42    @property
 43    def async_group(self) -> aio.Group:
 44        return self._async_group
 45
 46    async def process_events(self, events: Collection[hat.event.common.Event]):
 47        msgs = collections.deque()
 48        for event in events:
 49            try:
 50                mlog.debug('received event: %s', event)
 51                msg = _msg_from_event(self._event_type_prefix, event)
 52                msgs.append(msg)
 53
 54            except Exception as e:
 55                mlog.warning('error processing event: %s',
 56                             e, exc_info=e)
 57                continue
 58
 59        if not msgs:
 60            return
 61
 62        if not self._conn or not self._conn.is_open:
 63            mlog.warning('connection closed: %s events ignored',
 64                         len(msgs))
 65            return
 66
 67        try:
 68            await self._conn.send(msgs)
 69            mlog.debug('%s messages sent', len(msgs))
 70
 71        except ConnectionError as e:
 72            mlog.warning('error sending messages: %s', e, exc_info=e)
 73
 74    async def _connection_loop(self, conf, ssl_ctx):
 75
 76        async def cleanup():
 77            with contextlib.suppress(ConnectionError):
 78                await self._register_status('DISCONNECTED')
 79
 80            if self._conn:
 81                await self._conn.async_close()
 82
 83        try:
 84            while True:
 85                await self._register_status('CONNECTING')
 86                for address in conf['remote_addresses']:
 87                    try:
 88                        self._conn = await iec104.connect(
 89                            addr=tcp.Address(host=address['host'],
 90                                             port=address['port']),
 91                            response_timeout=conf['response_timeout'],
 92                            supervisory_timeout=conf['supervisory_timeout'],
 93                            test_timeout=conf['test_timeout'],
 94                            send_window_size=conf['send_window_size'],
 95                            receive_window_size=conf['receive_window_size'],
 96                            ssl=ssl_ctx)
 97
 98                        if conf['security']:
 99                            try:
100                                ssl.init_security(conf['security'], self._conn)
101
102                            except Exception:
103                                await aio.uncancellable(
104                                    self._conn.async_close())
105                                raise
106
107                        break
108
109                    except Exception as e:
110                        mlog.warning('connection failed: %s', e, exc_info=e)
111
112                else:
113                    await self._register_status('DISCONNECTED')
114                    await asyncio.sleep(conf['reconnect_delay'])
115                    continue
116
117                await self._register_status('CONNECTED')
118                self.async_group.spawn(self._receive_loop, self._conn)
119                if conf['time_sync_delay'] is not None:
120                    self.async_group.spawn(self._time_sync_loop, self._conn,
121                                           conf['time_sync_delay'])
122
123                await self._conn.wait_closed()
124                await self._register_status('DISCONNECTED')
125                self._conn = None
126
127        except ConnectionError:
128            pass
129
130        except Exception as e:
131            mlog.error('connection loop error: %s', e, exc_info=e)
132
133        finally:
134            mlog.debug('closing connection loop')
135            self.close()
136            await aio.uncancellable(cleanup())
137
138    async def _receive_loop(self, conn):
139        try:
140            while True:
141                try:
142                    msgs = await conn.receive()
143
144                except iec104.AsduTypeError as e:
145                    mlog.warning("asdu type error: %s", e)
146                    continue
147
148                events = collections.deque()
149                for msg in msgs:
150                    try:
151                        mlog.debug('received message: %s', msg)
152                        if isinstance(msg, iec104.ClockSyncMsg):
153                            continue
154
155                        event = _msg_to_event(self._event_type_prefix, msg)
156                        events.append(event)
157
158                    except Exception as e:
159                        mlog.warning('error processing message: %s',
160                                     e, exc_info=e)
161                        continue
162
163                if not events:
164                    continue
165
166                await self._eventer_client.register(events)
167                mlog.debug('%s events registered', len(events))
168
169        except ConnectionError:
170            mlog.debug('connection closed')
171
172        except Exception as e:
173            mlog.error('receive loop error: %s', e, exc_info=e)
174
175        finally:
176            mlog.debug('closing receive loop')
177            conn.close()
178
179    async def _time_sync_loop(self, conn, time_sync_delay):
180        try:
181            while True:
182                time_now = datetime.datetime.now(datetime.timezone.utc)
183                time_iec104_now = iec104.time_from_datetime(time_now)
184
185                msg = iec104.ClockSyncMsg(
186                    is_test=False,
187                    originator_address=0,
188                    asdu_address=0xFFFF,
189                    time=time_iec104_now,
190                    is_negative_confirm=False,
191                    cause=iec104.ClockSyncReqCause.ACTIVATION)
192                await conn.send([msg])
193
194                await conn.drain()
195                mlog.debug('time sync sent %s', time_iec104_now)
196
197                await asyncio.sleep(time_sync_delay)
198
199        except ConnectionError:
200            mlog.debug('connection closed')
201
202        except Exception as e:
203            mlog.error('time sync loop error: %s', e, exc_info=e)
204
205        finally:
206            mlog.debug('closing time sync loop')
207            conn.close()
208
209    async def _register_status(self, status):
210        event = hat.event.common.RegisterEvent(
211            type=(*self._event_type_prefix, 'gateway', 'status'),
212            source_timestamp=None,
213            payload=hat.event.common.EventPayloadJson(status))
214
215        await self._eventer_client.register([event])
216        mlog.debug('registered status %s', status)

Device interface

Iec104MasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27    def __init__(self,
28                 conf: common.DeviceConf,
29                 eventer_client: hat.event.eventer.Client,
30                 event_type_prefix: common.EventTypePrefix):
31        self._eventer_client = eventer_client
32        self._event_type_prefix = event_type_prefix
33        self._conn = None
34        self._async_group = aio.Group()
35
36        ssl_ctx = (
37            ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT)
38            if conf['security'] else None)
39
40        self.async_group.spawn(self._connection_loop, conf, ssl_ctx)
async_group: hat.aio.group.Group
42    @property
43    def async_group(self) -> aio.Group:
44        return self._async_group

Group controlling resource's lifetime.

async def process_events(self, events: Collection[hat.event.common.common.Event]):
46    async def process_events(self, events: Collection[hat.event.common.Event]):
47        msgs = collections.deque()
48        for event in events:
49            try:
50                mlog.debug('received event: %s', event)
51                msg = _msg_from_event(self._event_type_prefix, event)
52                msgs.append(msg)
53
54            except Exception as e:
55                mlog.warning('error processing event: %s',
56                             e, exc_info=e)
57                continue
58
59        if not msgs:
60            return
61
62        if not self._conn or not self._conn.is_open:
63            mlog.warning('connection closed: %s events ignored',
64                         len(msgs))
65            return
66
67        try:
68            await self._conn.send(msgs)
69            mlog.debug('%s messages sent', len(msgs))
70
71        except ConnectionError as e:
72            mlog.warning('error sending messages: %s', e, exc_info=e)

Process received events

This method can be coroutine or regular function.

info: hat.gateway.common.DeviceInfo = DeviceInfo(type='iec104_master', create=<class 'Iec104MasterDevice'>, json_schema_id='hat-gateway://iec104.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)