hat.gateway.devices.iec101.slave
IEC 60870-5-104 slave device
1"""IEC 60870-5-104 slave device""" 2 3from collections.abc import Collection, Iterable 4import collections 5import contextlib 6import functools 7import itertools 8import logging 9 10from hat import aio 11from hat import json 12from hat.drivers import iec101 13from hat.drivers import serial 14from hat.drivers.iec60870 import link 15import hat.event.common 16import hat.event.eventer 17 18from hat.gateway.devices.iec101 import common 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22 23 24async def create(conf: common.DeviceConf, 25 eventer_client: hat.event.eventer.Client, 26 event_type_prefix: common.EventTypePrefix 27 ) -> 'Iec101SlaveDevice': 28 device = Iec101SlaveDevice() 29 device._conf = conf 30 device._eventer_client = eventer_client 31 device._event_type_prefix = event_type_prefix 32 device._next_conn_ids = itertools.count(1) 33 device._conns = {} 34 device._buffers = {} 35 device._data_msgs = {} 36 device._data_buffers = {} 37 38 init_buffers(buffers_conf=conf['buffers'], 39 buffers=device._buffers) 40 41 await init_data(data_conf=conf['data'], 42 data_msgs=device._data_msgs, 43 data_buffers=device._data_buffers, 44 buffers=device._buffers, 45 eventer_client=eventer_client, 46 event_type_prefix=event_type_prefix) 47 48 device._slave = await link.unbalanced.create_slave( 49 port=conf['port'], 50 addrs=conf['addresses'], 51 connection_cb=device._on_connection, 52 baudrate=conf['baudrate'], 53 bytesize=serial.ByteSize[conf['bytesize']], 54 parity=serial.Parity[conf['parity']], 55 stopbits=serial.StopBits[conf['stopbits']], 56 xonxoff=conf['flow_control']['xonxoff'], 57 rtscts=conf['flow_control']['rtscts'], 58 dsrdtr=conf['flow_control']['dsrdtr'], 59 silent_interval=conf['silent_interval'], 60 address_size=link.AddressSize[conf['device_address_size']], 61 keep_alive_timeout=conf['keep_alive_timeout']) 62 63 try: 64 await device._register_connections() 65 66 except BaseException: 67 await aio.uncancellable(device.async_close()) 68 raise 69 70 return device 71 72 73info: common.DeviceInfo = common.DeviceInfo( 74 type="iec101_slave", 75 create=create, 76 json_schema_id="hat-gateway://iec101.yaml#/$defs/slave", 77 json_schema_repo=common.json_schema_repo) 78 79 80class Buffer: 81 82 def __init__(self, size: int): 83 self._size = size 84 self._data = collections.OrderedDict() 85 86 def add(self, 87 event_id: hat.event.common.EventId, 88 data_msg: iec101.DataMsg): 89 self._data[event_id] = data_msg 90 while len(self._data) > self._size: 91 self._data.popitem(last=False) 92 93 def remove(self, event_id: hat.event.common.EventId): 94 self._data.pop(event_id, None) 95 96 def get_event_id_data_msgs(self) -> Iterable[tuple[hat.event.common.EventId, # NOQA 97 iec101.DataMsg]]: 98 return self._data.items() 99 100 101def init_buffers(buffers_conf: json.Data, 102 buffers: dict[str, Buffer]): 103 for buffer_conf in buffers_conf: 104 buffers[buffer_conf['name']] = Buffer(buffer_conf['size']) 105 106 107async def init_data(data_conf: json.Data, 108 data_msgs: dict[common.DataKey, iec101.DataMsg], 109 data_buffers: dict[common.DataKey, Buffer], 110 buffers: dict[str, Buffer], 111 eventer_client: hat.event.eventer.Client, 112 event_type_prefix: common.EventTypePrefix): 113 for data in data_conf: 114 data_key = common.DataKey(data_type=common.DataType[data['data_type']], 115 asdu_address=data['asdu_address'], 116 io_address=data['io_address']) 117 data_msgs[data_key] = None 118 if data['buffer']: 119 data_buffers[data_key] = buffers[data['buffer']] 120 121 event_types = [(*event_type_prefix, 'system', 'data', '*')] 122 params = hat.event.common.QueryLatestParams(event_types) 123 result = await eventer_client.query(params) 124 125 for event in result.events: 126 try: 127 data_type_str, asdu_address_str, io_address_str = \ 128 event.type[len(event_type_prefix)+2:] 129 data_key = common.DataKey(data_type=common.DataType(data_type_str), 130 asdu_address=int(asdu_address_str), 131 io_address=int(io_address_str)) 132 if data_key not in data_msgs: 133 raise Exception(f'data {data_key} not configured') 134 135 data_msgs[data_key] = data_msg_from_event(data_key, event) 136 137 except Exception as e: 138 mlog.debug('skipping initial data: %s', e, exc_info=e) 139 140 141class Iec101SlaveDevice(common.Device): 142 143 @property 144 def async_group(self) -> aio.Group: 145 return self._slave.async_group 146 147 async def process_events(self, events: Collection[hat.event.common.Event]): 148 for event in events: 149 try: 150 await self._process_event(event) 151 152 except Exception as e: 153 mlog.warning('error processing event: %s', e, exc_info=e) 154 155 def _on_connection(self, conn): 156 self.async_group.spawn(self._connection_loop, conn) 157 158 async def _connection_loop(self, conn): 159 conn_id = next(self._next_conn_ids) 160 161 try: 162 conn = iec101.SlaveConnection( 163 conn=conn, 164 cause_size=iec101.CauseSize[self._conf['cause_size']], 165 asdu_address_size=iec101.AsduAddressSize[self._conf['asdu_address_size']], # NOQA 166 io_address_size=iec101.IoAddressSize[self._conf['io_address_size']]) # NOQA 167 168 self._conns[conn_id] = conn 169 await self._register_connections() 170 171 with contextlib.suppress(Exception): 172 for buffer in self._buffers.values(): 173 for event_id, data_msg in buffer.get_event_id_data_msgs(): 174 self._send_data_msg(conn, buffer, event_id, data_msg) 175 176 while True: 177 msgs = await conn.receive() 178 179 for msg in msgs: 180 try: 181 mlog.debug('received message: %s', msg) 182 await self._process_msg(conn_id, conn, msg) 183 184 except Exception as e: 185 mlog.warning('error processing message: %s', 186 e, exc_info=e) 187 188 except ConnectionError: 189 mlog.debug('connection close') 190 191 except Exception as e: 192 mlog.warning('connection error: %s', e, exc_info=e) 193 194 finally: 195 mlog.debug('closing connection') 196 conn.close() 197 198 with contextlib.suppress(Exception): 199 self._conns.pop(conn_id) 200 await aio.uncancellable(self._register_connections()) 201 202 async def _register_connections(self): 203 payload = [{'connection_id': conn_id, 204 'address': conn.address} 205 for conn_id, conn in self._conns.items()] 206 207 event = hat.event.common.RegisterEvent( 208 type=(*self._event_type_prefix, 'gateway', 'connections'), 209 source_timestamp=None, 210 payload=hat.event.common.EventPayloadJson(payload)) 211 212 await self._eventer_client.register([event]) 213 214 async def _process_event(self, event): 215 suffix = event.type[len(self._event_type_prefix):] 216 217 if suffix[:2] == ('system', 'data'): 218 data_type_str, asdu_address_str, io_address_str = suffix[2:] 219 data_key = common.DataKey(data_type=common.DataType(data_type_str), 220 asdu_address=int(asdu_address_str), 221 io_address=int(io_address_str)) 222 223 await self._process_data_event(data_key, event) 224 225 elif suffix[:2] == ('system', 'command'): 226 cmd_type_str, asdu_address_str, io_address_str = suffix[2:] 227 cmd_key = common.CommandKey( 228 cmd_type=common.CommandType(cmd_type_str), 229 asdu_address=int(asdu_address_str), 230 io_address=int(io_address_str)) 231 232 await self._process_command_event(cmd_key, event) 233 234 else: 235 raise Exception('unsupported event type') 236 237 async def _process_data_event(self, data_key, event): 238 if data_key not in self._data_msgs: 239 raise Exception('data not configured') 240 241 data_msg = data_msg_from_event(data_key, event) 242 self._data_msgs[data_key] = data_msg 243 244 buffer = self._data_buffers.get(data_key) 245 if buffer: 246 buffer.add(event.id, data_msg) 247 248 for conn in self._conns.values(): 249 self._send_data_msg(conn, buffer, event.id, data_msg) 250 251 async def _process_command_event(self, cmd_key, event): 252 cmd_msg = cmd_msg_from_event(cmd_key, event) 253 conn_id = event.payload.data['connection_id'] 254 conn = self._conns[conn_id] 255 conn.send([cmd_msg]) 256 257 async def _process_msg(self, conn_id, conn, msg): 258 if isinstance(msg, iec101.CommandMsg): 259 await self._process_command_msg(conn_id, conn, msg) 260 261 elif isinstance(msg, iec101.InterrogationMsg): 262 self._process_interrogation_msg(conn_id, conn, msg) 263 264 elif isinstance(msg, iec101.CounterInterrogationMsg): 265 self._process_counter_interrogation_msg(conn_id, conn, msg) 266 267 elif isinstance(msg, iec101.ReadMsg): 268 self._process_read_msg(conn_id, conn, msg) 269 270 elif isinstance(msg, iec101.ClockSyncMsg): 271 self._process_clock_sync_msg(conn_id, conn, msg) 272 273 elif isinstance(msg, iec101.TestMsg): 274 self._process_test_msg(conn_id, conn, msg) 275 276 elif isinstance(msg, iec101.ResetMsg): 277 self._process_reset_msg(conn_id, conn, msg) 278 279 elif isinstance(msg, iec101.ParameterMsg): 280 self._process_parameter_msg(conn_id, conn, msg) 281 282 elif isinstance(msg, iec101.ParameterActivationMsg): 283 self._process_parameter_activation_msg(conn_id, conn, msg) 284 285 else: 286 raise Exception('unsupported message') 287 288 async def _process_command_msg(self, conn_id, conn, msg): 289 if isinstance(msg.cause, iec101.CommandReqCause): 290 event = cmd_msg_to_event(self._event_type_prefix, conn_id, msg) 291 await self._eventer_client.register([event]) 292 293 else: 294 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 295 is_negative_confirm=True) 296 conn.send([res]) 297 298 def _process_interrogation_msg(self, conn_id, conn, msg): 299 if msg.cause == iec101.CommandReqCause.ACTIVATION: 300 res = msg._replace( 301 cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION, 302 is_negative_confirm=False) 303 conn.send([res]) 304 305 data_msgs = [ 306 data_msg._replace( 307 is_test=msg.is_test, 308 cause=iec101.DataResCause.INTERROGATED_STATION) 309 for data_msg in self._data_msgs.values() 310 if (data_msg and 311 (msg.asdu_address == 0xFFFF or 312 msg.asdu_address == data_msg.asdu_address) and 313 not isinstance(data_msg.data, iec101.BinaryCounterData))] 314 conn.send(data_msgs) 315 316 res = msg._replace( 317 cause=iec101.CommandResCause.ACTIVATION_TERMINATION, 318 is_negative_confirm=False) 319 conn.send([res]) 320 321 elif msg.cause == iec101.CommandReqCause.DEACTIVATION: 322 res = msg._replace( 323 cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION, 324 is_negative_confirm=True) 325 conn.send([res]) 326 327 else: 328 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 329 is_negative_confirm=True) 330 conn.send([res]) 331 332 def _process_counter_interrogation_msg(self, conn_id, conn, msg): 333 if msg.cause == iec101.CommandReqCause.ACTIVATION: 334 res = msg._replace( 335 cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION, 336 is_negative_confirm=False) 337 conn.send([res]) 338 339 data_msgs = [ 340 data_msg._replace( 341 is_test=msg.is_test, 342 cause=iec101.DataResCause.INTERROGATED_COUNTER) 343 for data_msg in self._data_msgs.values() 344 if (data_msg and 345 (msg.asdu_address == 0xFFFF or 346 msg.asdu_address == data_msg.asdu_address) and 347 isinstance(data_msg.data, iec101.BinaryCounterData))] 348 conn.send(data_msgs) 349 350 res = msg._replace( 351 cause=iec101.CommandResCause.ACTIVATION_TERMINATION, 352 is_negative_confirm=False) 353 conn.send([res]) 354 355 elif msg.cause == iec101.CommandReqCause.DEACTIVATION: 356 res = msg._replace( 357 cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION, 358 is_negative_confirm=True) 359 conn.send([res]) 360 361 else: 362 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 363 is_negative_confirm=True) 364 conn.send([res]) 365 366 def _process_read_msg(self, conn_id, conn, msg): 367 res = msg._replace(cause=iec101.ReadResCause.UNKNOWN_TYPE) 368 conn.send([res]) 369 370 def _process_clock_sync_msg(self, conn_id, conn, msg): 371 if isinstance(msg.cause, iec101.ClockSyncReqCause): 372 res = msg._replace( 373 cause=iec101.ClockSyncResCause.ACTIVATION_CONFIRMATION, 374 is_negative_confirm=True) 375 conn.send([res]) 376 377 else: 378 res = msg._replace(cause=iec101.ClockSyncResCause.UNKNOWN_CAUSE, 379 is_negative_confirm=True) 380 conn.send([res]) 381 382 def _process_test_msg(self, conn_id, conn, msg): 383 res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE) 384 conn.send([res]) 385 386 def _process_reset_msg(self, conn_id, conn, msg): 387 res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE) 388 conn.send([res]) 389 390 def _process_parameter_msg(self, conn_id, conn, msg): 391 res = msg._replace(cause=iec101.ParameterResCause.UNKNOWN_TYPE) 392 conn.send([res]) 393 394 def _process_parameter_activation_msg(self, conn_id, conn, msg): 395 res = msg._replace( 396 cause=iec101.ParameterActivationResCause.UNKNOWN_TYPE) 397 conn.send([res]) 398 399 def _send_data_msg(self, conn, buffer, event_id, data_msg): 400 sent_cb = (functools.partial(buffer.remove, event_id) 401 if buffer else None) 402 conn.send([data_msg], sent_cb=sent_cb) 403 404 405def cmd_msg_to_event(event_type_prefix: hat.event.common.EventType, 406 conn_id: int, 407 msg: iec101.CommandMsg 408 ) -> hat.event.common.RegisterEvent: 409 command_type = common.get_command_type(msg.command) 410 cause = common.cause_to_json(iec101.CommandReqCause, msg.cause) 411 command = common.command_to_json(msg.command) 412 event_type = (*event_type_prefix, 'gateway', 'command', command_type.value, 413 str(msg.asdu_address), str(msg.io_address)) 414 415 return hat.event.common.RegisterEvent( 416 type=event_type, 417 source_timestamp=None, 418 payload=hat.event.common.EventPayloadJson({ 419 'connection_id': conn_id, 420 'is_test': msg.is_test, 421 'cause': cause, 422 'command': command})) 423 424 425def data_msg_from_event(data_key: common.DataKey, 426 event: hat.event.common.Event 427 ) -> iec101.DataMsg: 428 time = common.time_from_source_timestamp(event.source_timestamp) 429 cause = common.cause_from_json(iec101.DataResCause, 430 event.payload.data['cause']) 431 data = common.data_from_json(data_key.data_type, 432 event.payload.data['data']) 433 434 return iec101.DataMsg(is_test=event.payload.data['is_test'], 435 originator_address=0, 436 asdu_address=data_key.asdu_address, 437 io_address=data_key.io_address, 438 data=data, 439 time=time, 440 cause=cause) 441 442 443def cmd_msg_from_event(cmd_key: common.CommandKey, 444 event: hat.event.common.Event 445 ) -> iec101.CommandMsg: 446 cause = common.cause_from_json(iec101.CommandResCause, 447 event.payload.data['cause']) 448 command = common.command_from_json(cmd_key.cmd_type, 449 event.payload.data['command']) 450 is_negative_confirm = event.payload.data['is_negative_confirm'] 451 452 return iec101.CommandMsg(is_test=event.payload.data['is_test'], 453 originator_address=0, 454 asdu_address=cmd_key.asdu_address, 455 io_address=cmd_key.io_address, 456 command=command, 457 is_negative_confirm=is_negative_confirm, 458 cause=cause)
async def
create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec101SlaveDevice:
25async def create(conf: common.DeviceConf, 26 eventer_client: hat.event.eventer.Client, 27 event_type_prefix: common.EventTypePrefix 28 ) -> 'Iec101SlaveDevice': 29 device = Iec101SlaveDevice() 30 device._conf = conf 31 device._eventer_client = eventer_client 32 device._event_type_prefix = event_type_prefix 33 device._next_conn_ids = itertools.count(1) 34 device._conns = {} 35 device._buffers = {} 36 device._data_msgs = {} 37 device._data_buffers = {} 38 39 init_buffers(buffers_conf=conf['buffers'], 40 buffers=device._buffers) 41 42 await init_data(data_conf=conf['data'], 43 data_msgs=device._data_msgs, 44 data_buffers=device._data_buffers, 45 buffers=device._buffers, 46 eventer_client=eventer_client, 47 event_type_prefix=event_type_prefix) 48 49 device._slave = await link.unbalanced.create_slave( 50 port=conf['port'], 51 addrs=conf['addresses'], 52 connection_cb=device._on_connection, 53 baudrate=conf['baudrate'], 54 bytesize=serial.ByteSize[conf['bytesize']], 55 parity=serial.Parity[conf['parity']], 56 stopbits=serial.StopBits[conf['stopbits']], 57 xonxoff=conf['flow_control']['xonxoff'], 58 rtscts=conf['flow_control']['rtscts'], 59 dsrdtr=conf['flow_control']['dsrdtr'], 60 silent_interval=conf['silent_interval'], 61 address_size=link.AddressSize[conf['device_address_size']], 62 keep_alive_timeout=conf['keep_alive_timeout']) 63 64 try: 65 await device._register_connections() 66 67 except BaseException: 68 await aio.uncancellable(device.async_close()) 69 raise 70 71 return device
info: hat.gateway.common.DeviceInfo =
DeviceInfo(type='iec101_slave', create=<function create>, json_schema_id='hat-gateway://iec101.yaml#/$defs/slave', json_schema_repo=<hat.json.repository.SchemaRepository object>)
class
Buffer:
81class Buffer: 82 83 def __init__(self, size: int): 84 self._size = size 85 self._data = collections.OrderedDict() 86 87 def add(self, 88 event_id: hat.event.common.EventId, 89 data_msg: iec101.DataMsg): 90 self._data[event_id] = data_msg 91 while len(self._data) > self._size: 92 self._data.popitem(last=False) 93 94 def remove(self, event_id: hat.event.common.EventId): 95 self._data.pop(event_id, None) 96 97 def get_event_id_data_msgs(self) -> Iterable[tuple[hat.event.common.EventId, # NOQA 98 iec101.DataMsg]]: 99 return self._data.items()
def
add( self, event_id: hat.event.common.common.EventId, data_msg: hat.drivers.iec101.common.DataMsg):
def
init_buffers( buffers_conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], buffers: dict[str, Buffer]):
async def
init_data( data_conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], data_msgs: dict[hat.gateway.devices.iec101.common.DataKey, hat.drivers.iec101.common.DataMsg], data_buffers: dict[hat.gateway.devices.iec101.common.DataKey, Buffer], buffers: dict[str, Buffer], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]):
108async def init_data(data_conf: json.Data, 109 data_msgs: dict[common.DataKey, iec101.DataMsg], 110 data_buffers: dict[common.DataKey, Buffer], 111 buffers: dict[str, Buffer], 112 eventer_client: hat.event.eventer.Client, 113 event_type_prefix: common.EventTypePrefix): 114 for data in data_conf: 115 data_key = common.DataKey(data_type=common.DataType[data['data_type']], 116 asdu_address=data['asdu_address'], 117 io_address=data['io_address']) 118 data_msgs[data_key] = None 119 if data['buffer']: 120 data_buffers[data_key] = buffers[data['buffer']] 121 122 event_types = [(*event_type_prefix, 'system', 'data', '*')] 123 params = hat.event.common.QueryLatestParams(event_types) 124 result = await eventer_client.query(params) 125 126 for event in result.events: 127 try: 128 data_type_str, asdu_address_str, io_address_str = \ 129 event.type[len(event_type_prefix)+2:] 130 data_key = common.DataKey(data_type=common.DataType(data_type_str), 131 asdu_address=int(asdu_address_str), 132 io_address=int(io_address_str)) 133 if data_key not in data_msgs: 134 raise Exception(f'data {data_key} not configured') 135 136 data_msgs[data_key] = data_msg_from_event(data_key, event) 137 138 except Exception as e: 139 mlog.debug('skipping initial data: %s', e, exc_info=e)
142class Iec101SlaveDevice(common.Device): 143 144 @property 145 def async_group(self) -> aio.Group: 146 return self._slave.async_group 147 148 async def process_events(self, events: Collection[hat.event.common.Event]): 149 for event in events: 150 try: 151 await self._process_event(event) 152 153 except Exception as e: 154 mlog.warning('error processing event: %s', e, exc_info=e) 155 156 def _on_connection(self, conn): 157 self.async_group.spawn(self._connection_loop, conn) 158 159 async def _connection_loop(self, conn): 160 conn_id = next(self._next_conn_ids) 161 162 try: 163 conn = iec101.SlaveConnection( 164 conn=conn, 165 cause_size=iec101.CauseSize[self._conf['cause_size']], 166 asdu_address_size=iec101.AsduAddressSize[self._conf['asdu_address_size']], # NOQA 167 io_address_size=iec101.IoAddressSize[self._conf['io_address_size']]) # NOQA 168 169 self._conns[conn_id] = conn 170 await self._register_connections() 171 172 with contextlib.suppress(Exception): 173 for buffer in self._buffers.values(): 174 for event_id, data_msg in buffer.get_event_id_data_msgs(): 175 self._send_data_msg(conn, buffer, event_id, data_msg) 176 177 while True: 178 msgs = await conn.receive() 179 180 for msg in msgs: 181 try: 182 mlog.debug('received message: %s', msg) 183 await self._process_msg(conn_id, conn, msg) 184 185 except Exception as e: 186 mlog.warning('error processing message: %s', 187 e, exc_info=e) 188 189 except ConnectionError: 190 mlog.debug('connection close') 191 192 except Exception as e: 193 mlog.warning('connection error: %s', e, exc_info=e) 194 195 finally: 196 mlog.debug('closing connection') 197 conn.close() 198 199 with contextlib.suppress(Exception): 200 self._conns.pop(conn_id) 201 await aio.uncancellable(self._register_connections()) 202 203 async def _register_connections(self): 204 payload = [{'connection_id': conn_id, 205 'address': conn.address} 206 for conn_id, conn in self._conns.items()] 207 208 event = hat.event.common.RegisterEvent( 209 type=(*self._event_type_prefix, 'gateway', 'connections'), 210 source_timestamp=None, 211 payload=hat.event.common.EventPayloadJson(payload)) 212 213 await self._eventer_client.register([event]) 214 215 async def _process_event(self, event): 216 suffix = event.type[len(self._event_type_prefix):] 217 218 if suffix[:2] == ('system', 'data'): 219 data_type_str, asdu_address_str, io_address_str = suffix[2:] 220 data_key = common.DataKey(data_type=common.DataType(data_type_str), 221 asdu_address=int(asdu_address_str), 222 io_address=int(io_address_str)) 223 224 await self._process_data_event(data_key, event) 225 226 elif suffix[:2] == ('system', 'command'): 227 cmd_type_str, asdu_address_str, io_address_str = suffix[2:] 228 cmd_key = common.CommandKey( 229 cmd_type=common.CommandType(cmd_type_str), 230 asdu_address=int(asdu_address_str), 231 io_address=int(io_address_str)) 232 233 await self._process_command_event(cmd_key, event) 234 235 else: 236 raise Exception('unsupported event type') 237 238 async def _process_data_event(self, data_key, event): 239 if data_key not in self._data_msgs: 240 raise Exception('data not configured') 241 242 data_msg = data_msg_from_event(data_key, event) 243 self._data_msgs[data_key] = data_msg 244 245 buffer = self._data_buffers.get(data_key) 246 if buffer: 247 buffer.add(event.id, data_msg) 248 249 for conn in self._conns.values(): 250 self._send_data_msg(conn, buffer, event.id, data_msg) 251 252 async def _process_command_event(self, cmd_key, event): 253 cmd_msg = cmd_msg_from_event(cmd_key, event) 254 conn_id = event.payload.data['connection_id'] 255 conn = self._conns[conn_id] 256 conn.send([cmd_msg]) 257 258 async def _process_msg(self, conn_id, conn, msg): 259 if isinstance(msg, iec101.CommandMsg): 260 await self._process_command_msg(conn_id, conn, msg) 261 262 elif isinstance(msg, iec101.InterrogationMsg): 263 self._process_interrogation_msg(conn_id, conn, msg) 264 265 elif isinstance(msg, iec101.CounterInterrogationMsg): 266 self._process_counter_interrogation_msg(conn_id, conn, msg) 267 268 elif isinstance(msg, iec101.ReadMsg): 269 self._process_read_msg(conn_id, conn, msg) 270 271 elif isinstance(msg, iec101.ClockSyncMsg): 272 self._process_clock_sync_msg(conn_id, conn, msg) 273 274 elif isinstance(msg, iec101.TestMsg): 275 self._process_test_msg(conn_id, conn, msg) 276 277 elif isinstance(msg, iec101.ResetMsg): 278 self._process_reset_msg(conn_id, conn, msg) 279 280 elif isinstance(msg, iec101.ParameterMsg): 281 self._process_parameter_msg(conn_id, conn, msg) 282 283 elif isinstance(msg, iec101.ParameterActivationMsg): 284 self._process_parameter_activation_msg(conn_id, conn, msg) 285 286 else: 287 raise Exception('unsupported message') 288 289 async def _process_command_msg(self, conn_id, conn, msg): 290 if isinstance(msg.cause, iec101.CommandReqCause): 291 event = cmd_msg_to_event(self._event_type_prefix, conn_id, msg) 292 await self._eventer_client.register([event]) 293 294 else: 295 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 296 is_negative_confirm=True) 297 conn.send([res]) 298 299 def _process_interrogation_msg(self, conn_id, conn, msg): 300 if msg.cause == iec101.CommandReqCause.ACTIVATION: 301 res = msg._replace( 302 cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION, 303 is_negative_confirm=False) 304 conn.send([res]) 305 306 data_msgs = [ 307 data_msg._replace( 308 is_test=msg.is_test, 309 cause=iec101.DataResCause.INTERROGATED_STATION) 310 for data_msg in self._data_msgs.values() 311 if (data_msg and 312 (msg.asdu_address == 0xFFFF or 313 msg.asdu_address == data_msg.asdu_address) and 314 not isinstance(data_msg.data, iec101.BinaryCounterData))] 315 conn.send(data_msgs) 316 317 res = msg._replace( 318 cause=iec101.CommandResCause.ACTIVATION_TERMINATION, 319 is_negative_confirm=False) 320 conn.send([res]) 321 322 elif msg.cause == iec101.CommandReqCause.DEACTIVATION: 323 res = msg._replace( 324 cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION, 325 is_negative_confirm=True) 326 conn.send([res]) 327 328 else: 329 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 330 is_negative_confirm=True) 331 conn.send([res]) 332 333 def _process_counter_interrogation_msg(self, conn_id, conn, msg): 334 if msg.cause == iec101.CommandReqCause.ACTIVATION: 335 res = msg._replace( 336 cause=iec101.CommandResCause.ACTIVATION_CONFIRMATION, 337 is_negative_confirm=False) 338 conn.send([res]) 339 340 data_msgs = [ 341 data_msg._replace( 342 is_test=msg.is_test, 343 cause=iec101.DataResCause.INTERROGATED_COUNTER) 344 for data_msg in self._data_msgs.values() 345 if (data_msg and 346 (msg.asdu_address == 0xFFFF or 347 msg.asdu_address == data_msg.asdu_address) and 348 isinstance(data_msg.data, iec101.BinaryCounterData))] 349 conn.send(data_msgs) 350 351 res = msg._replace( 352 cause=iec101.CommandResCause.ACTIVATION_TERMINATION, 353 is_negative_confirm=False) 354 conn.send([res]) 355 356 elif msg.cause == iec101.CommandReqCause.DEACTIVATION: 357 res = msg._replace( 358 cause=iec101.CommandResCause.DEACTIVATION_CONFIRMATION, 359 is_negative_confirm=True) 360 conn.send([res]) 361 362 else: 363 res = msg._replace(cause=iec101.CommandResCause.UNKNOWN_CAUSE, 364 is_negative_confirm=True) 365 conn.send([res]) 366 367 def _process_read_msg(self, conn_id, conn, msg): 368 res = msg._replace(cause=iec101.ReadResCause.UNKNOWN_TYPE) 369 conn.send([res]) 370 371 def _process_clock_sync_msg(self, conn_id, conn, msg): 372 if isinstance(msg.cause, iec101.ClockSyncReqCause): 373 res = msg._replace( 374 cause=iec101.ClockSyncResCause.ACTIVATION_CONFIRMATION, 375 is_negative_confirm=True) 376 conn.send([res]) 377 378 else: 379 res = msg._replace(cause=iec101.ClockSyncResCause.UNKNOWN_CAUSE, 380 is_negative_confirm=True) 381 conn.send([res]) 382 383 def _process_test_msg(self, conn_id, conn, msg): 384 res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE) 385 conn.send([res]) 386 387 def _process_reset_msg(self, conn_id, conn, msg): 388 res = msg._replace(cause=iec101.ActivationResCause.UNKNOWN_TYPE) 389 conn.send([res]) 390 391 def _process_parameter_msg(self, conn_id, conn, msg): 392 res = msg._replace(cause=iec101.ParameterResCause.UNKNOWN_TYPE) 393 conn.send([res]) 394 395 def _process_parameter_activation_msg(self, conn_id, conn, msg): 396 res = msg._replace( 397 cause=iec101.ParameterActivationResCause.UNKNOWN_TYPE) 398 conn.send([res]) 399 400 def _send_data_msg(self, conn, buffer, event_id, data_msg): 401 sent_cb = (functools.partial(buffer.remove, event_id) 402 if buffer else None) 403 conn.send([data_msg], sent_cb=sent_cb)
Device interface
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
148 async def process_events(self, events: Collection[hat.event.common.Event]): 149 for event in events: 150 try: 151 await self._process_event(event) 152 153 except Exception as e: 154 mlog.warning('error processing event: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.
def
cmd_msg_to_event( event_type_prefix: tuple[str, ...], conn_id: int, msg: hat.drivers.iec101.common.CommandMsg) -> hat.event.common.common.RegisterEvent:
406def cmd_msg_to_event(event_type_prefix: hat.event.common.EventType, 407 conn_id: int, 408 msg: iec101.CommandMsg 409 ) -> hat.event.common.RegisterEvent: 410 command_type = common.get_command_type(msg.command) 411 cause = common.cause_to_json(iec101.CommandReqCause, msg.cause) 412 command = common.command_to_json(msg.command) 413 event_type = (*event_type_prefix, 'gateway', 'command', command_type.value, 414 str(msg.asdu_address), str(msg.io_address)) 415 416 return hat.event.common.RegisterEvent( 417 type=event_type, 418 source_timestamp=None, 419 payload=hat.event.common.EventPayloadJson({ 420 'connection_id': conn_id, 421 'is_test': msg.is_test, 422 'cause': cause, 423 'command': command}))
def
data_msg_from_event( data_key: hat.gateway.devices.iec101.common.DataKey, event: hat.event.common.common.Event) -> hat.drivers.iec101.common.DataMsg:
426def data_msg_from_event(data_key: common.DataKey, 427 event: hat.event.common.Event 428 ) -> iec101.DataMsg: 429 time = common.time_from_source_timestamp(event.source_timestamp) 430 cause = common.cause_from_json(iec101.DataResCause, 431 event.payload.data['cause']) 432 data = common.data_from_json(data_key.data_type, 433 event.payload.data['data']) 434 435 return iec101.DataMsg(is_test=event.payload.data['is_test'], 436 originator_address=0, 437 asdu_address=data_key.asdu_address, 438 io_address=data_key.io_address, 439 data=data, 440 time=time, 441 cause=cause)
def
cmd_msg_from_event( cmd_key: hat.gateway.devices.iec101.common.CommandKey, event: hat.event.common.common.Event) -> hat.drivers.iec101.common.CommandMsg:
444def cmd_msg_from_event(cmd_key: common.CommandKey, 445 event: hat.event.common.Event 446 ) -> iec101.CommandMsg: 447 cause = common.cause_from_json(iec101.CommandResCause, 448 event.payload.data['cause']) 449 command = common.command_from_json(cmd_key.cmd_type, 450 event.payload.data['command']) 451 is_negative_confirm = event.payload.data['is_negative_confirm'] 452 453 return iec101.CommandMsg(is_test=event.payload.data['is_test'], 454 originator_address=0, 455 asdu_address=cmd_key.asdu_address, 456 io_address=cmd_key.io_address, 457 command=command, 458 is_negative_confirm=is_negative_confirm, 459 cause=cause)