hat.gateway.devices.iec104.master
IEC 60870-5-104 master device
1"""IEC 60870-5-104 master device""" 2 3from collections.abc import Collection 4import asyncio 5import collections 6import contextlib 7import datetime 8import enum 9import logging 10 11from hat import aio 12from hat.drivers import iec104 13from hat.drivers import tcp 14import hat.event.common 15import hat.event.eventer 16 17from hat.gateway.devices.iec104 import common 18from hat.gateway.devices.iec104 import ssl 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22 23 24class Iec104MasterDevice(common.Device): 25 26 def __init__(self, 27 conf: common.DeviceConf, 28 eventer_client: hat.event.eventer.Client, 29 event_type_prefix: common.EventTypePrefix): 30 self._eventer_client = eventer_client 31 self._event_type_prefix = event_type_prefix 32 self._conn = None 33 self._async_group = aio.Group() 34 35 ssl_ctx = ( 36 ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT) 37 if conf['security'] else None) 38 39 self.async_group.spawn(self._connection_loop, conf, ssl_ctx) 40 41 @property 42 def async_group(self) -> aio.Group: 43 return self._async_group 44 45 async def process_events(self, events: Collection[hat.event.common.Event]): 46 msgs = collections.deque() 47 for event in events: 48 try: 49 mlog.debug('received event: %s', event) 50 msg = _msg_from_event(self._event_type_prefix, event) 51 msgs.append(msg) 52 53 except Exception as e: 54 mlog.warning('error processing event: %s', 55 e, exc_info=e) 56 continue 57 58 if not msgs: 59 return 60 61 if not self._conn or not self._conn.is_open: 62 mlog.warning('connection closed: %s events ignored', 63 len(msgs)) 64 return 65 66 try: 67 await self._conn.send(msgs) 68 mlog.debug('%s messages sent', len(msgs)) 69 70 except ConnectionError as e: 71 mlog.warning('error sending messages: %s', e, exc_info=e) 72 73 async def _connection_loop(self, conf, ssl_ctx): 74 75 async def cleanup(): 76 with contextlib.suppress(ConnectionError): 77 await self._register_status('DISCONNECTED') 78 79 if self._conn: 80 await self._conn.async_close() 81 82 try: 83 while True: 84 await self._register_status('CONNECTING') 85 for address in conf['remote_addresses']: 86 try: 87 self._conn = await iec104.connect( 88 addr=tcp.Address(host=address['host'], 89 port=address['port']), 90 response_timeout=conf['response_timeout'], 91 supervisory_timeout=conf['supervisory_timeout'], 92 test_timeout=conf['test_timeout'], 93 send_window_size=conf['send_window_size'], 94 receive_window_size=conf['receive_window_size'], 95 ssl=ssl_ctx) 96 97 if conf['security']: 98 try: 99 ssl.init_security(conf['security'], self._conn) 100 101 except Exception: 102 await aio.uncancellable( 103 self._conn.async_close()) 104 raise 105 106 break 107 108 except Exception as e: 109 mlog.warning('connection failed: %s', e, exc_info=e) 110 111 else: 112 await self._register_status('DISCONNECTED') 113 await asyncio.sleep(conf['reconnect_delay']) 114 continue 115 116 await self._register_status('CONNECTED') 117 self.async_group.spawn(self._receive_loop, self._conn) 118 if conf['time_sync_delay'] is not None: 119 self.async_group.spawn(self._time_sync_loop, self._conn, 120 conf['time_sync_delay']) 121 122 await self._conn.wait_closed() 123 await self._register_status('DISCONNECTED') 124 self._conn = None 125 126 except ConnectionError: 127 pass 128 129 except Exception as e: 130 mlog.error('connection loop error: %s', e, exc_info=e) 131 132 finally: 133 mlog.debug('closing connection loop') 134 self.close() 135 await aio.uncancellable(cleanup()) 136 137 async def _receive_loop(self, conn): 138 try: 139 while True: 140 try: 141 msgs = await conn.receive() 142 143 except iec104.AsduTypeError as e: 144 mlog.warning("asdu type error: %s", e) 145 continue 146 147 events = collections.deque() 148 for msg in msgs: 149 try: 150 mlog.debug('received message: %s', msg) 151 if isinstance(msg, iec104.ClockSyncMsg): 152 continue 153 154 event = _msg_to_event(self._event_type_prefix, msg) 155 events.append(event) 156 157 except Exception as e: 158 mlog.warning('error processing message: %s', 159 e, exc_info=e) 160 continue 161 162 if not events: 163 continue 164 165 await self._eventer_client.register(events) 166 mlog.debug('%s events registered', len(events)) 167 168 except ConnectionError: 169 mlog.debug('connection closed') 170 171 except Exception as e: 172 mlog.error('receive loop error: %s', e, exc_info=e) 173 174 finally: 175 mlog.debug('closing receive loop') 176 conn.close() 177 178 async def _time_sync_loop(self, conn, time_sync_delay): 179 try: 180 while True: 181 time_now = datetime.datetime.now(datetime.timezone.utc) 182 time_iec104_now = iec104.time_from_datetime(time_now) 183 184 msg = iec104.ClockSyncMsg( 185 is_test=False, 186 originator_address=0, 187 asdu_address=0xFFFF, 188 time=time_iec104_now, 189 is_negative_confirm=False, 190 cause=iec104.ClockSyncReqCause.ACTIVATION) 191 await conn.send([msg]) 192 193 await conn.drain() 194 mlog.debug('time sync sent %s', time_iec104_now) 195 196 await asyncio.sleep(time_sync_delay) 197 198 except ConnectionError: 199 mlog.debug('connection closed') 200 201 except Exception as e: 202 mlog.error('time sync loop error: %s', e, exc_info=e) 203 204 finally: 205 mlog.debug('closing time sync loop') 206 conn.close() 207 208 async def _register_status(self, status): 209 event = hat.event.common.RegisterEvent( 210 type=(*self._event_type_prefix, 'gateway', 'status'), 211 source_timestamp=None, 212 payload=hat.event.common.EventPayloadJson(status)) 213 214 await self._eventer_client.register([event]) 215 mlog.debug('registered status %s', status) 216 217 218info: common.DeviceInfo = common.DeviceInfo( 219 type="iec104_master", 220 create=Iec104MasterDevice, 221 json_schema_id="hat-gateway://iec104.yaml#/$defs/master", 222 json_schema_repo=common.json_schema_repo) 223 224 225def _msg_to_event(event_type_prefix, msg): 226 if isinstance(msg, iec104.DataMsg): 227 return _data_to_event(event_type_prefix, msg) 228 229 if isinstance(msg, iec104.CommandMsg): 230 return _command_to_event(event_type_prefix, msg) 231 232 if isinstance(msg, iec104.InterrogationMsg): 233 return _interrogation_to_event(event_type_prefix, msg) 234 235 if isinstance(msg, iec104.CounterInterrogationMsg): 236 return _counter_interrogation_to_event(event_type_prefix, msg) 237 238 raise Exception('unsupported message type') 239 240 241def _data_to_event(event_type_prefix, msg): 242 data_type = common.get_data_type(msg.data) 243 cause = _cause_to_json(iec104.DataResCause, msg.cause) 244 data = common.data_to_json(msg.data) 245 event_type = (*event_type_prefix, 'gateway', 'data', data_type.value, 246 str(msg.asdu_address), str(msg.io_address)) 247 source_timestamp = common.time_to_source_timestamp(msg.time) 248 249 return hat.event.common.RegisterEvent( 250 type=event_type, 251 source_timestamp=source_timestamp, 252 payload=hat.event.common.EventPayloadJson({ 253 'is_test': msg.is_test, 254 'cause': cause, 255 'data': data})) 256 257 258def _command_to_event(event_type_prefix, msg): 259 command_type = common.get_command_type(msg.command) 260 cause = _cause_to_json(iec104.CommandResCause, msg.cause) 261 command = common.command_to_json(msg.command) 262 event_type = (*event_type_prefix, 'gateway', 'command', command_type.value, 263 str(msg.asdu_address), str(msg.io_address)) 264 source_timestamp = common.time_to_source_timestamp(msg.time) 265 266 return hat.event.common.RegisterEvent( 267 type=event_type, 268 source_timestamp=source_timestamp, 269 payload=hat.event.common.EventPayloadJson({ 270 'is_test': msg.is_test, 271 'is_negative_confirm': msg.is_negative_confirm, 272 'cause': cause, 273 'command': command})) 274 275 276def _interrogation_to_event(event_type_prefix, msg): 277 cause = _cause_to_json(iec104.CommandResCause, msg.cause) 278 event_type = (*event_type_prefix, 'gateway', 'interrogation', 279 str(msg.asdu_address)) 280 281 return hat.event.common.RegisterEvent( 282 type=event_type, 283 source_timestamp=None, 284 payload=hat.event.common.EventPayloadJson({ 285 'is_test': msg.is_test, 286 'is_negative_confirm': msg.is_negative_confirm, 287 'request': msg.request, 288 'cause': cause})) 289 290 291def _counter_interrogation_to_event(event_type_prefix, msg): 292 cause = _cause_to_json(iec104.CommandResCause, msg.cause) 293 event_type = (*event_type_prefix, 'gateway', 'counter_interrogation', 294 str(msg.asdu_address)) 295 296 return hat.event.common.RegisterEvent( 297 type=event_type, 298 source_timestamp=None, 299 payload=hat.event.common.EventPayloadJson({ 300 'is_test': msg.is_test, 301 'is_negative_confirm': msg.is_negative_confirm, 302 'request': msg.request, 303 'freeze': msg.freeze.name, 304 'cause': cause})) 305 306 307def _msg_from_event(event_type_prefix, event): 308 suffix = event.type[len(event_type_prefix):] 309 310 if suffix[:2] == ('system', 'command'): 311 cmd_type_str, asdu_address_str, io_address_str = suffix[2:] 312 cmd_key = common.CommandKey(cmd_type=common.CommandType(cmd_type_str), 313 asdu_address=int(asdu_address_str), 314 io_address=int(io_address_str)) 315 return _command_from_event(cmd_key, event) 316 317 if suffix[:2] == ('system', 'interrogation'): 318 asdu_address = int(suffix[2]) 319 return _interrogation_from_event(asdu_address, event) 320 321 if suffix[:2] == ('system', 'counter_interrogation'): 322 asdu_address = int(suffix[2]) 323 return _counter_interrogation_from_event(asdu_address, event) 324 325 raise Exception('unsupported event type') 326 327 328def _command_from_event(cmd_key, event): 329 time = common.time_from_source_timestamp(event.source_timestamp) 330 cause = _cause_from_json(iec104.CommandReqCause, 331 event.payload.data['cause']) 332 command = common.command_from_json(cmd_key.cmd_type, 333 event.payload.data['command']) 334 335 return iec104.CommandMsg(is_test=event.payload.data['is_test'], 336 originator_address=0, 337 asdu_address=cmd_key.asdu_address, 338 io_address=cmd_key.io_address, 339 command=command, 340 is_negative_confirm=False, 341 time=time, 342 cause=cause) 343 344 345def _interrogation_from_event(asdu_address, event): 346 cause = _cause_from_json(iec104.CommandReqCause, 347 event.payload.data['cause']) 348 349 return iec104.InterrogationMsg(is_test=event.payload.data['is_test'], 350 originator_address=0, 351 asdu_address=asdu_address, 352 request=event.payload.data['request'], 353 is_negative_confirm=False, 354 cause=cause) 355 356 357def _counter_interrogation_from_event(asdu_address, event): 358 freeze = iec104.FreezeCode[event.payload.data['freeze']] 359 cause = _cause_from_json(iec104.CommandReqCause, 360 event.payload.data['cause']) 361 362 return iec104.CounterInterrogationMsg( 363 is_test=event.payload.data['is_test'], 364 originator_address=0, 365 asdu_address=asdu_address, 366 request=event.payload.data['request'], 367 freeze=freeze, 368 is_negative_confirm=False, 369 cause=cause) 370 371 372def _cause_to_json(cls, cause): 373 return (cause.name if isinstance(cause, cls) else 374 cause.value if isinstance(cause, enum.Enum) else 375 cause) 376 377 378def _cause_from_json(cls, cause): 379 return cls[cause] if isinstance(cause, str) else cause
25class Iec104MasterDevice(common.Device): 26 27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._eventer_client = eventer_client 32 self._event_type_prefix = event_type_prefix 33 self._conn = None 34 self._async_group = aio.Group() 35 36 ssl_ctx = ( 37 ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT) 38 if conf['security'] else None) 39 40 self.async_group.spawn(self._connection_loop, conf, ssl_ctx) 41 42 @property 43 def async_group(self) -> aio.Group: 44 return self._async_group 45 46 async def process_events(self, events: Collection[hat.event.common.Event]): 47 msgs = collections.deque() 48 for event in events: 49 try: 50 mlog.debug('received event: %s', event) 51 msg = _msg_from_event(self._event_type_prefix, event) 52 msgs.append(msg) 53 54 except Exception as e: 55 mlog.warning('error processing event: %s', 56 e, exc_info=e) 57 continue 58 59 if not msgs: 60 return 61 62 if not self._conn or not self._conn.is_open: 63 mlog.warning('connection closed: %s events ignored', 64 len(msgs)) 65 return 66 67 try: 68 await self._conn.send(msgs) 69 mlog.debug('%s messages sent', len(msgs)) 70 71 except ConnectionError as e: 72 mlog.warning('error sending messages: %s', e, exc_info=e) 73 74 async def _connection_loop(self, conf, ssl_ctx): 75 76 async def cleanup(): 77 with contextlib.suppress(ConnectionError): 78 await self._register_status('DISCONNECTED') 79 80 if self._conn: 81 await self._conn.async_close() 82 83 try: 84 while True: 85 await self._register_status('CONNECTING') 86 for address in conf['remote_addresses']: 87 try: 88 self._conn = await iec104.connect( 89 addr=tcp.Address(host=address['host'], 90 port=address['port']), 91 response_timeout=conf['response_timeout'], 92 supervisory_timeout=conf['supervisory_timeout'], 93 test_timeout=conf['test_timeout'], 94 send_window_size=conf['send_window_size'], 95 receive_window_size=conf['receive_window_size'], 96 ssl=ssl_ctx) 97 98 if conf['security']: 99 try: 100 ssl.init_security(conf['security'], self._conn) 101 102 except Exception: 103 await aio.uncancellable( 104 self._conn.async_close()) 105 raise 106 107 break 108 109 except Exception as e: 110 mlog.warning('connection failed: %s', e, exc_info=e) 111 112 else: 113 await self._register_status('DISCONNECTED') 114 await asyncio.sleep(conf['reconnect_delay']) 115 continue 116 117 await self._register_status('CONNECTED') 118 self.async_group.spawn(self._receive_loop, self._conn) 119 if conf['time_sync_delay'] is not None: 120 self.async_group.spawn(self._time_sync_loop, self._conn, 121 conf['time_sync_delay']) 122 123 await self._conn.wait_closed() 124 await self._register_status('DISCONNECTED') 125 self._conn = None 126 127 except ConnectionError: 128 pass 129 130 except Exception as e: 131 mlog.error('connection loop error: %s', e, exc_info=e) 132 133 finally: 134 mlog.debug('closing connection loop') 135 self.close() 136 await aio.uncancellable(cleanup()) 137 138 async def _receive_loop(self, conn): 139 try: 140 while True: 141 try: 142 msgs = await conn.receive() 143 144 except iec104.AsduTypeError as e: 145 mlog.warning("asdu type error: %s", e) 146 continue 147 148 events = collections.deque() 149 for msg in msgs: 150 try: 151 mlog.debug('received message: %s', msg) 152 if isinstance(msg, iec104.ClockSyncMsg): 153 continue 154 155 event = _msg_to_event(self._event_type_prefix, msg) 156 events.append(event) 157 158 except Exception as e: 159 mlog.warning('error processing message: %s', 160 e, exc_info=e) 161 continue 162 163 if not events: 164 continue 165 166 await self._eventer_client.register(events) 167 mlog.debug('%s events registered', len(events)) 168 169 except ConnectionError: 170 mlog.debug('connection closed') 171 172 except Exception as e: 173 mlog.error('receive loop error: %s', e, exc_info=e) 174 175 finally: 176 mlog.debug('closing receive loop') 177 conn.close() 178 179 async def _time_sync_loop(self, conn, time_sync_delay): 180 try: 181 while True: 182 time_now = datetime.datetime.now(datetime.timezone.utc) 183 time_iec104_now = iec104.time_from_datetime(time_now) 184 185 msg = iec104.ClockSyncMsg( 186 is_test=False, 187 originator_address=0, 188 asdu_address=0xFFFF, 189 time=time_iec104_now, 190 is_negative_confirm=False, 191 cause=iec104.ClockSyncReqCause.ACTIVATION) 192 await conn.send([msg]) 193 194 await conn.drain() 195 mlog.debug('time sync sent %s', time_iec104_now) 196 197 await asyncio.sleep(time_sync_delay) 198 199 except ConnectionError: 200 mlog.debug('connection closed') 201 202 except Exception as e: 203 mlog.error('time sync loop error: %s', e, exc_info=e) 204 205 finally: 206 mlog.debug('closing time sync loop') 207 conn.close() 208 209 async def _register_status(self, status): 210 event = hat.event.common.RegisterEvent( 211 type=(*self._event_type_prefix, 'gateway', 'status'), 212 source_timestamp=None, 213 payload=hat.event.common.EventPayloadJson(status)) 214 215 await self._eventer_client.register([event]) 216 mlog.debug('registered status %s', status)
Device interface
Iec104MasterDevice( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
27 def __init__(self, 28 conf: common.DeviceConf, 29 eventer_client: hat.event.eventer.Client, 30 event_type_prefix: common.EventTypePrefix): 31 self._eventer_client = eventer_client 32 self._event_type_prefix = event_type_prefix 33 self._conn = None 34 self._async_group = aio.Group() 35 36 ssl_ctx = ( 37 ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_CLIENT) 38 if conf['security'] else None) 39 40 self.async_group.spawn(self._connection_loop, conf, ssl_ctx)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
46 async def process_events(self, events: Collection[hat.event.common.Event]): 47 msgs = collections.deque() 48 for event in events: 49 try: 50 mlog.debug('received event: %s', event) 51 msg = _msg_from_event(self._event_type_prefix, event) 52 msgs.append(msg) 53 54 except Exception as e: 55 mlog.warning('error processing event: %s', 56 e, exc_info=e) 57 continue 58 59 if not msgs: 60 return 61 62 if not self._conn or not self._conn.is_open: 63 mlog.warning('connection closed: %s events ignored', 64 len(msgs)) 65 return 66 67 try: 68 await self._conn.send(msgs) 69 mlog.debug('%s messages sent', len(msgs)) 70 71 except ConnectionError as e: 72 mlog.warning('error sending messages: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.
info: hat.gateway.common.DeviceInfo =
DeviceInfo(type='iec104_master', create=<class 'Iec104MasterDevice'>, json_schema_id='hat-gateway://iec104.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)