hat.gateway.devices.iec101.master
IEC 60870-5-101 master device
1"""IEC 60870-5-101 master device""" 2 3from collections.abc import Collection 4import asyncio 5import collections 6import contextlib 7import datetime 8import functools 9import logging 10 11from hat import aio 12from hat.drivers import iec101 13from hat.drivers import serial 14from hat.drivers.iec60870 import link 15import hat.event.common 16import hat.event.eventer 17 18from hat.gateway.devices.iec101 import common 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22 23 24async def create(conf: common.DeviceConf, 25 eventer_client: hat.event.eventer.Client, 26 event_type_prefix: common.EventTypePrefix 27 ) -> 'Iec101MasterDevice': 28 event_types = [(*event_type_prefix, 'system', 'remote_device', 29 str(i['address']), 'enable') 30 for i in conf['remote_devices']] 31 params = hat.event.common.QueryLatestParams(event_types) 32 result = await eventer_client.query(params) 33 34 device = Iec101MasterDevice(conf=conf, 35 eventer_client=eventer_client, 36 event_type_prefix=event_type_prefix) 37 try: 38 await device.process_events(result.events) 39 40 except BaseException: 41 await aio.uncancellable(device.async_close()) 42 raise 43 44 return device 45 46 47info: common.DeviceInfo = common.DeviceInfo( 48 type="iec101_master", 49 create=create, 50 json_schema_id="hat-gateway://iec101.yaml#/$defs/master", 51 json_schema_repo=common.json_schema_repo) 52 53 54class Iec101MasterDevice(common.Device): 55 56 def __init__(self, 57 conf: common.DeviceConf, 58 eventer_client: hat.event.eventer.Client, 59 event_type_prefix: common.EventTypePrefix, 60 send_queue_size: int = 1024): 61 self._conf = conf 62 self._event_type_prefix = event_type_prefix 63 self._eventer_client = eventer_client 64 self._master = None 65 self._conns = {} 66 self._send_queue = aio.Queue(send_queue_size) 67 self._async_group = aio.Group() 68 self._remote_enabled = {i['address']: False 69 for i in conf['remote_devices']} 70 self._remote_confs = {i['address']: i 71 for i in conf['remote_devices']} 72 self._remote_groups = {} 73 74 self.async_group.spawn(self._create_link_master_loop) 75 self.async_group.spawn(self._send_loop) 76 77 @property 78 def async_group(self) -> aio.Group: 79 return self._async_group 80 81 async def process_events(self, events: Collection[hat.event.common.Event]): 82 for event in events: 83 try: 84 await self._process_event(event) 85 86 except Exception as e: 87 mlog.warning('error processing event: %s', e, exc_info=e) 88 89 async def _create_link_master_loop(self): 90 91 async def cleanup(): 92 with contextlib.suppress(ConnectionError): 93 await self._register_status('DISCONNECTED') 94 95 if self._master: 96 await self._master.async_close() 97 98 try: 99 while True: 100 await self._register_status('CONNECTING') 101 102 try: 103 self._master = await link.unbalanced.create_master( 104 port=self._conf['port'], 105 baudrate=self._conf['baudrate'], 106 bytesize=serial.ByteSize[self._conf['bytesize']], 107 parity=serial.Parity[self._conf['parity']], 108 stopbits=serial.StopBits[self._conf['stopbits']], 109 xonxoff=self._conf['flow_control']['xonxoff'], 110 rtscts=self._conf['flow_control']['rtscts'], 111 dsrdtr=self._conf['flow_control']['dsrdtr'], 112 silent_interval=self._conf['silent_interval'], 113 address_size=link.AddressSize[ 114 self._conf['device_address_size']]) 115 116 except Exception as e: 117 mlog.warning('link master (endpoint) failed to create: %s', 118 e, exc_info=e) 119 await self._register_status('DISCONNECTED') 120 await asyncio.sleep(self._conf['reconnect_delay']) 121 continue 122 123 await self._register_status('CONNECTED') 124 125 for address, enabled in self._remote_enabled.items(): 126 if enabled: 127 self._enable_remote(address) 128 129 await self._master.wait_closed() 130 await self._register_status('DISCONNECTED') 131 self._master = None 132 133 except Exception as e: 134 mlog.error('create link master error: %s', e, exc_info=e) 135 136 finally: 137 mlog.debug('closing link master loop') 138 self.close() 139 self._conns = {} 140 await aio.uncancellable(cleanup()) 141 142 async def _send_loop(self): 143 while True: 144 msg, address = await self._send_queue.get() 145 146 conn = self._conns.get(address) 147 if not conn or not conn.is_open: 148 mlog.warning('msg %s not sent, connection to %s closed', 149 msg, address) 150 continue 151 152 try: 153 await conn.send([msg]) 154 mlog.debug('msg sent asdu=%s', msg.asdu_address) 155 156 except ConnectionError: 157 mlog.warning('msg %s not sent, connection to %s closed', 158 msg, address) 159 160 async def _connection_loop(self, group, address): 161 162 async def cleanup(): 163 with contextlib.suppress(ConnectionError): 164 await self._register_rmt_status(address, 'DISCONNECTED') 165 166 conn = self._conns.pop(address, None) 167 if conn: 168 await conn.async_close() 169 170 remote_conf = self._remote_confs[address] 171 try: 172 while True: 173 await self._register_rmt_status(address, 'CONNECTING') 174 175 try: 176 master_conn = await self._master.connect( 177 addr=address, 178 response_timeout=remote_conf['response_timeout'], 179 send_retry_count=remote_conf['send_retry_count'], 180 poll_class1_delay=remote_conf['poll_class1_delay'], 181 poll_class2_delay=remote_conf['poll_class2_delay']) 182 183 except Exception as e: 184 mlog.error('connection error to address %s: %s', 185 address, e, exc_info=e) 186 await self._register_rmt_status(address, 'DISCONNECTED') 187 await asyncio.sleep(remote_conf['reconnect_delay']) 188 continue 189 190 await self._register_rmt_status(address, 'CONNECTED') 191 192 conn = iec101.MasterConnection( 193 conn=master_conn, 194 cause_size=iec101.CauseSize[self._conf['cause_size']], 195 asdu_address_size=iec101.AsduAddressSize[ 196 self._conf['asdu_address_size']], 197 io_address_size=iec101.IoAddressSize[ 198 self._conf['io_address_size']]) 199 self._conns[address] = conn 200 group.spawn(self._receive_loop, conn, address) 201 202 if remote_conf['time_sync_delay'] is not None: 203 group.spawn(self._time_sync_loop, conn, 204 remote_conf['time_sync_delay']) 205 206 await conn.wait_closed() 207 await self._register_rmt_status(address, 'DISCONNECTED') 208 self._conns.pop(address) 209 210 except Exception as e: 211 mlog.error('connection loop error: %s', e, exc_info=e) 212 213 finally: 214 mlog.debug('closing remote device %s', address) 215 group.close() 216 await aio.uncancellable(cleanup()) 217 218 async def _receive_loop(self, conn, address): 219 try: 220 while True: 221 try: 222 msgs = await conn.receive() 223 224 except iec101.AsduTypeError as e: 225 mlog.warning("asdu type error: %s", e) 226 continue 227 228 events = collections.deque() 229 for msg in msgs: 230 if isinstance(msg, iec101.ClockSyncMsg): 231 continue 232 233 try: 234 event = _msg_to_event(self._event_type_prefix, address, 235 msg) 236 events.append(event) 237 238 except Exception as e: 239 mlog.warning('message %s ignored due to: %s', 240 msg, e, exc_info=e) 241 242 if not events: 243 continue 244 245 await self._eventer_client.register(events) 246 for e in events: 247 mlog.debug('registered event %s', e) 248 249 except ConnectionError: 250 mlog.debug('connection closed') 251 252 except Exception as e: 253 mlog.error('receive loop error: %s', e, exc_info=e) 254 255 finally: 256 conn.close() 257 258 async def _time_sync_loop(self, conn, delay): 259 try: 260 while True: 261 time_now = datetime.datetime.now(datetime.timezone.utc) 262 time_iec101 = iec101.time_from_datetime(time_now) 263 msg = iec101.ClockSyncMsg( 264 is_test=False, 265 originator_address=0, 266 asdu_address={ 267 'ONE': 0xFF, 268 'TWO': 0xFFFF}[self._conf['asdu_address_size']], 269 time=time_iec101, 270 is_negative_confirm=False, 271 cause=iec101.ClockSyncReqCause.ACTIVATION) 272 await conn.send([msg]) 273 mlog.debug('time sync sent %s', time_iec101) 274 275 await asyncio.sleep(delay) 276 277 except ConnectionError: 278 mlog.debug('connection closed') 279 280 except Exception as e: 281 mlog.error('time sync loop error: %s', e, exc_info=e) 282 283 finally: 284 conn.close() 285 286 async def _process_event(self, event): 287 match_type = functools.partial(hat.event.common.matches_query_type, 288 event.type) 289 290 prefix = (*self._event_type_prefix, 'system', 'remote_device', '?') 291 if not match_type((*prefix, '*')): 292 raise Exception('unexpected event type') 293 294 address = int(event.type[len(prefix) - 1]) 295 suffix = event.type[len(prefix):] 296 297 if match_type((*prefix, 'enable')): 298 self._process_event_enable(address, event) 299 300 elif match_type((*prefix, 'command', '?', '?', '?')): 301 cmd_key = common.CommandKey( 302 cmd_type=common.CommandType(suffix[1]), 303 asdu_address=int(suffix[2]), 304 io_address=int(suffix[3])) 305 msg = _command_from_event(cmd_key, event) 306 307 await self._send_queue.put((msg, address)) 308 mlog.debug('command asdu=%s io=%s prepared for sending', 309 cmd_key.asdu_address, cmd_key.io_address) 310 311 elif match_type((*prefix, 'interrogation', '?')): 312 asdu_address = int(suffix[1]) 313 msg = _interrogation_from_event(asdu_address, event) 314 315 await self._send_queue.put((msg, address)) 316 mlog.debug("interrogation request asdu=%s prepared for sending", 317 asdu_address) 318 319 elif match_type((*prefix, 'counter_interrogation', '?')): 320 asdu_address = int(suffix[1]) 321 msg = _counter_interrogation_from_event(asdu_address, event) 322 323 await self._send_queue.put((msg, address)) 324 mlog.debug("counter interrogation request asdu=%s prepared for " 325 "sending", asdu_address) 326 327 else: 328 raise Exception('unexpected event type') 329 330 def _process_event_enable(self, address, event): 331 if address not in self._remote_enabled: 332 raise Exception('invalid remote device address') 333 334 enable = event.payload.data 335 if not isinstance(enable, bool): 336 raise Exception('invalid enable event payload') 337 338 if address not in self._remote_enabled: 339 mlog.warning('received enable for unexpected remote device') 340 return 341 342 self._remote_enabled[address] = enable 343 344 if not enable: 345 self._disable_remote(address) 346 347 elif not self._master: 348 return 349 350 else: 351 self._enable_remote(address) 352 353 def _enable_remote(self, address): 354 mlog.debug('enabling device %s', address) 355 remote_group = self._remote_groups.get(address) 356 if remote_group and remote_group.is_open: 357 mlog.debug('device %s is already running', address) 358 return 359 360 remote_group = self._async_group.create_subgroup() 361 self._remote_groups[address] = remote_group 362 remote_group.spawn(self._connection_loop, remote_group, address) 363 364 def _disable_remote(self, address): 365 mlog.debug('disabling device %s', address) 366 if address in self._remote_groups: 367 remote_group = self._remote_groups.pop(address) 368 remote_group.close() 369 370 async def _register_status(self, status): 371 event = hat.event.common.RegisterEvent( 372 type=(*self._event_type_prefix, 'gateway', 'status'), 373 source_timestamp=None, 374 payload=hat.event.common.EventPayloadJson(status)) 375 await self._eventer_client.register([event]) 376 mlog.debug('device status -> %s', status) 377 378 async def _register_rmt_status(self, address, status): 379 event = hat.event.common.RegisterEvent( 380 type=(*self._event_type_prefix, 'gateway', 'remote_device', 381 str(address), 'status'), 382 source_timestamp=None, 383 payload=hat.event.common.EventPayloadJson(status)) 384 await self._eventer_client.register([event]) 385 mlog.debug('remote device %s status -> %s', address, status) 386 387 388def _msg_to_event(event_type_prefix, address, msg): 389 if isinstance(msg, iec101.DataMsg): 390 return _data_to_event(event_type_prefix, address, msg) 391 392 if isinstance(msg, iec101.CommandMsg): 393 return _command_to_event(event_type_prefix, address, msg) 394 395 if isinstance(msg, iec101.InterrogationMsg): 396 return _interrogation_to_event(event_type_prefix, address, msg) 397 398 if isinstance(msg, iec101.CounterInterrogationMsg): 399 return _counter_interrogation_to_event(event_type_prefix, address, msg) 400 401 raise Exception('unsupported message type') 402 403 404def _data_to_event(event_type_prefix, address, msg): 405 data_type = common.get_data_type(msg.data) 406 cause = common.cause_to_json(iec101.DataResCause, msg.cause) 407 data = common.data_to_json(msg.data) 408 event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address), 409 'data', data_type.value, str(msg.asdu_address), 410 str(msg.io_address)) 411 source_timestamp = common.time_to_source_timestamp(msg.time) 412 413 return hat.event.common.RegisterEvent( 414 type=event_type, 415 source_timestamp=source_timestamp, 416 payload=hat.event.common.EventPayloadJson({'is_test': msg.is_test, 417 'cause': cause, 418 'data': data})) 419 420 421def _command_to_event(event_type_prefix, address, msg): 422 command_type = common.get_command_type(msg.command) 423 cause = common.cause_to_json(iec101.CommandResCause, msg.cause) 424 command = common.command_to_json(msg.command) 425 event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address), 426 'command', command_type.value, str(msg.asdu_address), 427 str(msg.io_address)) 428 429 return hat.event.common.RegisterEvent( 430 type=event_type, 431 source_timestamp=None, 432 payload=hat.event.common.EventPayloadJson({ 433 'is_test': msg.is_test, 434 'is_negative_confirm': msg.is_negative_confirm, 435 'cause': cause, 436 'command': command})) 437 438 439def _interrogation_to_event(event_type_prefix, address, msg): 440 cause = common.cause_to_json(iec101.CommandResCause, msg.cause) 441 event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address), 442 'interrogation', str(msg.asdu_address)) 443 444 return hat.event.common.RegisterEvent( 445 type=event_type, 446 source_timestamp=None, 447 payload=hat.event.common.EventPayloadJson({ 448 'is_test': msg.is_test, 449 'is_negative_confirm': msg.is_negative_confirm, 450 'request': msg.request, 451 'cause': cause})) 452 453 454def _counter_interrogation_to_event(event_type_prefix, address, msg): 455 cause = common.cause_to_json(iec101.CommandResCause, msg.cause) 456 event_type = (*event_type_prefix, 'gateway', 'remote_device', str(address), 457 'counter_interrogation', str(msg.asdu_address)) 458 459 return hat.event.common.RegisterEvent( 460 type=event_type, 461 source_timestamp=None, 462 payload=hat.event.common.EventPayloadJson({ 463 'is_test': msg.is_test, 464 'is_negative_confirm': msg.is_negative_confirm, 465 'request': msg.request, 466 'freeze': msg.freeze.name, 467 'cause': cause})) 468 469 470def _command_from_event(cmd_key, event): 471 cause = common.cause_from_json(iec101.CommandReqCause, 472 event.payload.data['cause']) 473 command = common.command_from_json(cmd_key.cmd_type, 474 event.payload.data['command']) 475 476 return iec101.CommandMsg(is_test=event.payload.data['is_test'], 477 originator_address=0, 478 asdu_address=cmd_key.asdu_address, 479 io_address=cmd_key.io_address, 480 command=command, 481 is_negative_confirm=False, 482 cause=cause) 483 484 485def _interrogation_from_event(asdu_address, event): 486 cause = common.cause_from_json(iec101.CommandReqCause, 487 event.payload.data['cause']) 488 489 return iec101.InterrogationMsg(is_test=event.payload.data['is_test'], 490 originator_address=0, 491 asdu_address=asdu_address, 492 request=event.payload.data['request'], 493 is_negative_confirm=False, 494 cause=cause) 495 496 497def _counter_interrogation_from_event(asdu_address, event): 498 freeze = iec101.FreezeCode[event.payload.data['freeze']] 499 cause = common.cause_from_json(iec101.CommandReqCause, 500 event.payload.data['cause']) 501 502 return iec101.CounterInterrogationMsg( 503 is_test=event.payload.data['is_test'], 504 originator_address=0, 505 asdu_address=asdu_address, 506 request=event.payload.data['request'], 507 freeze=freeze, 508 is_negative_confirm=False, 509 cause=cause)
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec101MasterDevice:
25async def create(conf: common.DeviceConf, 26 eventer_client: hat.event.eventer.Client, 27 event_type_prefix: common.EventTypePrefix 28 ) -> 'Iec101MasterDevice': 29 event_types = [(*event_type_prefix, 'system', 'remote_device', 30 str(i['address']), 'enable') 31 for i in conf['remote_devices']] 32 params = hat.event.common.QueryLatestParams(event_types) 33 result = await eventer_client.query(params) 34 35 device = Iec101MasterDevice(conf=conf, 36 eventer_client=eventer_client, 37 event_type_prefix=event_type_prefix) 38 try: 39 await device.process_events(result.events) 40 41 except BaseException: 42 await aio.uncancellable(device.async_close()) 43 raise 44 45 return device
info: hat.gateway.common.DeviceInfo =
DeviceInfo(type='iec101_master', create=<function create>, json_schema_id='hat-gateway://iec101.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)
55class Iec101MasterDevice(common.Device): 56 57 def __init__(self, 58 conf: common.DeviceConf, 59 eventer_client: hat.event.eventer.Client, 60 event_type_prefix: common.EventTypePrefix, 61 send_queue_size: int = 1024): 62 self._conf = conf 63 self._event_type_prefix = event_type_prefix 64 self._eventer_client = eventer_client 65 self._master = None 66 self._conns = {} 67 self._send_queue = aio.Queue(send_queue_size) 68 self._async_group = aio.Group() 69 self._remote_enabled = {i['address']: False 70 for i in conf['remote_devices']} 71 self._remote_confs = {i['address']: i 72 for i in conf['remote_devices']} 73 self._remote_groups = {} 74 75 self.async_group.spawn(self._create_link_master_loop) 76 self.async_group.spawn(self._send_loop) 77 78 @property 79 def async_group(self) -> aio.Group: 80 return self._async_group 81 82 async def process_events(self, events: Collection[hat.event.common.Event]): 83 for event in events: 84 try: 85 await self._process_event(event) 86 87 except Exception as e: 88 mlog.warning('error processing event: %s', e, exc_info=e) 89 90 async def _create_link_master_loop(self): 91 92 async def cleanup(): 93 with contextlib.suppress(ConnectionError): 94 await self._register_status('DISCONNECTED') 95 96 if self._master: 97 await self._master.async_close() 98 99 try: 100 while True: 101 await self._register_status('CONNECTING') 102 103 try: 104 self._master = await link.unbalanced.create_master( 105 port=self._conf['port'], 106 baudrate=self._conf['baudrate'], 107 bytesize=serial.ByteSize[self._conf['bytesize']], 108 parity=serial.Parity[self._conf['parity']], 109 stopbits=serial.StopBits[self._conf['stopbits']], 110 xonxoff=self._conf['flow_control']['xonxoff'], 111 rtscts=self._conf['flow_control']['rtscts'], 112 dsrdtr=self._conf['flow_control']['dsrdtr'], 113 silent_interval=self._conf['silent_interval'], 114 address_size=link.AddressSize[ 115 self._conf['device_address_size']]) 116 117 except Exception as e: 118 mlog.warning('link master (endpoint) failed to create: %s', 119 e, exc_info=e) 120 await self._register_status('DISCONNECTED') 121 await asyncio.sleep(self._conf['reconnect_delay']) 122 continue 123 124 await self._register_status('CONNECTED') 125 126 for address, enabled in self._remote_enabled.items(): 127 if enabled: 128 self._enable_remote(address) 129 130 await self._master.wait_closed() 131 await self._register_status('DISCONNECTED') 132 self._master = None 133 134 except Exception as e: 135 mlog.error('create link master error: %s', e, exc_info=e) 136 137 finally: 138 mlog.debug('closing link master loop') 139 self.close() 140 self._conns = {} 141 await aio.uncancellable(cleanup()) 142 143 async def _send_loop(self): 144 while True: 145 msg, address = await self._send_queue.get() 146 147 conn = self._conns.get(address) 148 if not conn or not conn.is_open: 149 mlog.warning('msg %s not sent, connection to %s closed', 150 msg, address) 151 continue 152 153 try: 154 await conn.send([msg]) 155 mlog.debug('msg sent asdu=%s', msg.asdu_address) 156 157 except ConnectionError: 158 mlog.warning('msg %s not sent, connection to %s closed', 159 msg, address) 160 161 async def _connection_loop(self, group, address): 162 163 async def cleanup(): 164 with contextlib.suppress(ConnectionError): 165 await self._register_rmt_status(address, 'DISCONNECTED') 166 167 conn = self._conns.pop(address, None) 168 if conn: 169 await conn.async_close() 170 171 remote_conf = self._remote_confs[address] 172 try: 173 while True: 174 await self._register_rmt_status(address, 'CONNECTING') 175 176 try: 177 master_conn = await self._master.connect( 178 addr=address, 179 response_timeout=remote_conf['response_timeout'], 180 send_retry_count=remote_conf['send_retry_count'], 181 poll_class1_delay=remote_conf['poll_class1_delay'], 182 poll_class2_delay=remote_conf['poll_class2_delay']) 183 184 except Exception as e: 185 mlog.error('connection error to address %s: %s', 186 address, e, exc_info=e) 187 await self._register_rmt_status(address, 'DISCONNECTED') 188 await asyncio.sleep(remote_conf['reconnect_delay']) 189 continue 190 191 await self._register_rmt_status(address, 'CONNECTED') 192 193 conn = iec101.MasterConnection( 194 conn=master_conn, 195 cause_size=iec101.CauseSize[self._conf['cause_size']], 196 asdu_address_size=iec101.AsduAddressSize[ 197 self._conf['asdu_address_size']], 198 io_address_size=iec101.IoAddressSize[ 199 self._conf['io_address_size']]) 200 self._conns[address] = conn 201 group.spawn(self._receive_loop, conn, address) 202 203 if remote_conf['time_sync_delay'] is not None: 204 group.spawn(self._time_sync_loop, conn, 205 remote_conf['time_sync_delay']) 206 207 await conn.wait_closed() 208 await self._register_rmt_status(address, 'DISCONNECTED') 209 self._conns.pop(address) 210 211 except Exception as e: 212 mlog.error('connection loop error: %s', e, exc_info=e) 213 214 finally: 215 mlog.debug('closing remote device %s', address) 216 group.close() 217 await aio.uncancellable(cleanup()) 218 219 async def _receive_loop(self, conn, address): 220 try: 221 while True: 222 try: 223 msgs = await conn.receive() 224 225 except iec101.AsduTypeError as e: 226 mlog.warning("asdu type error: %s", e) 227 continue 228 229 events = collections.deque() 230 for msg in msgs: 231 if isinstance(msg, iec101.ClockSyncMsg): 232 continue 233 234 try: 235 event = _msg_to_event(self._event_type_prefix, address, 236 msg) 237 events.append(event) 238 239 except Exception as e: 240 mlog.warning('message %s ignored due to: %s', 241 msg, e, exc_info=e) 242 243 if not events: 244 continue 245 246 await self._eventer_client.register(events) 247 for e in events: 248 mlog.debug('registered event %s', e) 249 250 except ConnectionError: 251 mlog.debug('connection closed') 252 253 except Exception as e: 254 mlog.error('receive loop error: %s', e, exc_info=e) 255 256 finally: 257 conn.close() 258 259 async def _time_sync_loop(self, conn, delay): 260 try: 261 while True: 262 time_now = datetime.datetime.now(datetime.timezone.utc) 263 time_iec101 = iec101.time_from_datetime(time_now) 264 msg = iec101.ClockSyncMsg( 265 is_test=False, 266 originator_address=0, 267 asdu_address={ 268 'ONE': 0xFF, 269 'TWO': 0xFFFF}[self._conf['asdu_address_size']], 270 time=time_iec101, 271 is_negative_confirm=False, 272 cause=iec101.ClockSyncReqCause.ACTIVATION) 273 await conn.send([msg]) 274 mlog.debug('time sync sent %s', time_iec101) 275 276 await asyncio.sleep(delay) 277 278 except ConnectionError: 279 mlog.debug('connection closed') 280 281 except Exception as e: 282 mlog.error('time sync loop error: %s', e, exc_info=e) 283 284 finally: 285 conn.close() 286 287 async def _process_event(self, event): 288 match_type = functools.partial(hat.event.common.matches_query_type, 289 event.type) 290 291 prefix = (*self._event_type_prefix, 'system', 'remote_device', '?') 292 if not match_type((*prefix, '*')): 293 raise Exception('unexpected event type') 294 295 address = int(event.type[len(prefix) - 1]) 296 suffix = event.type[len(prefix):] 297 298 if match_type((*prefix, 'enable')): 299 self._process_event_enable(address, event) 300 301 elif match_type((*prefix, 'command', '?', '?', '?')): 302 cmd_key = common.CommandKey( 303 cmd_type=common.CommandType(suffix[1]), 304 asdu_address=int(suffix[2]), 305 io_address=int(suffix[3])) 306 msg = _command_from_event(cmd_key, event) 307 308 await self._send_queue.put((msg, address)) 309 mlog.debug('command asdu=%s io=%s prepared for sending', 310 cmd_key.asdu_address, cmd_key.io_address) 311 312 elif match_type((*prefix, 'interrogation', '?')): 313 asdu_address = int(suffix[1]) 314 msg = _interrogation_from_event(asdu_address, event) 315 316 await self._send_queue.put((msg, address)) 317 mlog.debug("interrogation request asdu=%s prepared for sending", 318 asdu_address) 319 320 elif match_type((*prefix, 'counter_interrogation', '?')): 321 asdu_address = int(suffix[1]) 322 msg = _counter_interrogation_from_event(asdu_address, event) 323 324 await self._send_queue.put((msg, address)) 325 mlog.debug("counter interrogation request asdu=%s prepared for " 326 "sending", asdu_address) 327 328 else: 329 raise Exception('unexpected event type') 330 331 def _process_event_enable(self, address, event): 332 if address not in self._remote_enabled: 333 raise Exception('invalid remote device address') 334 335 enable = event.payload.data 336 if not isinstance(enable, bool): 337 raise Exception('invalid enable event payload') 338 339 if address not in self._remote_enabled: 340 mlog.warning('received enable for unexpected remote device') 341 return 342 343 self._remote_enabled[address] = enable 344 345 if not enable: 346 self._disable_remote(address) 347 348 elif not self._master: 349 return 350 351 else: 352 self._enable_remote(address) 353 354 def _enable_remote(self, address): 355 mlog.debug('enabling device %s', address) 356 remote_group = self._remote_groups.get(address) 357 if remote_group and remote_group.is_open: 358 mlog.debug('device %s is already running', address) 359 return 360 361 remote_group = self._async_group.create_subgroup() 362 self._remote_groups[address] = remote_group 363 remote_group.spawn(self._connection_loop, remote_group, address) 364 365 def _disable_remote(self, address): 366 mlog.debug('disabling device %s', address) 367 if address in self._remote_groups: 368 remote_group = self._remote_groups.pop(address) 369 remote_group.close() 370 371 async def _register_status(self, status): 372 event = hat.event.common.RegisterEvent( 373 type=(*self._event_type_prefix, 'gateway', 'status'), 374 source_timestamp=None, 375 payload=hat.event.common.EventPayloadJson(status)) 376 await self._eventer_client.register([event]) 377 mlog.debug('device status -> %s', status) 378 379 async def _register_rmt_status(self, address, status): 380 event = hat.event.common.RegisterEvent( 381 type=(*self._event_type_prefix, 'gateway', 'remote_device', 382 str(address), 'status'), 383 source_timestamp=None, 384 payload=hat.event.common.EventPayloadJson(status)) 385 await self._eventer_client.register([event]) 386 mlog.debug('remote device %s status -> %s', address, status)
Device interface
Iec101MasterDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str], send_queue_size: int = 1024)
57 def __init__(self, 58 conf: common.DeviceConf, 59 eventer_client: hat.event.eventer.Client, 60 event_type_prefix: common.EventTypePrefix, 61 send_queue_size: int = 1024): 62 self._conf = conf 63 self._event_type_prefix = event_type_prefix 64 self._eventer_client = eventer_client 65 self._master = None 66 self._conns = {} 67 self._send_queue = aio.Queue(send_queue_size) 68 self._async_group = aio.Group() 69 self._remote_enabled = {i['address']: False 70 for i in conf['remote_devices']} 71 self._remote_confs = {i['address']: i 72 for i in conf['remote_devices']} 73 self._remote_groups = {} 74 75 self.async_group.spawn(self._create_link_master_loop) 76 self.async_group.spawn(self._send_loop)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
82 async def process_events(self, events: Collection[hat.event.common.Event]): 83 for event in events: 84 try: 85 await self._process_event(event) 86 87 except Exception as e: 88 mlog.warning('error processing event: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.