hat.gateway.devices.iec104.slave
IEC 60870-5-104 slave device
1"""IEC 60870-5-104 slave device""" 2 3from collections.abc import Collection 4import contextlib 5import functools 6import itertools 7import logging 8 9from hat import aio 10from hat.drivers import iec104 11from hat.drivers import tcp 12import hat.event.common 13import hat.event.eventer 14 15from hat.gateway.devices.iec101 import slave as iec101_slave 16from hat.gateway.devices.iec104 import common 17from hat.gateway.devices.iec104 import ssl 18 19 20mlog: logging.Logger = logging.getLogger(__name__) 21 22 23async def create(conf: common.DeviceConf, 24 eventer_client: hat.event.eventer.Client, 25 event_type_prefix: common.EventTypePrefix 26 ) -> 'Iec104SlaveDevice': 27 device = Iec104SlaveDevice() 28 device._conf = conf 29 device._eventer_client = eventer_client 30 device._event_type_prefix = event_type_prefix 31 device._max_connections = conf['max_connections'] 32 device._next_conn_ids = itertools.count(1) 33 device._conns = {} 34 device._remote_hosts = (set(conf['remote_hosts']) 35 if conf['remote_hosts'] is not None else None) 36 device._buffers = {} 37 device._data_msgs = {} 38 device._data_buffers = {} 39 40 iec101_slave.init_buffers(buffers_conf=conf['buffers'], 41 buffers=device._buffers) 42 43 await iec101_slave.init_data(data_conf=conf['data'], 44 data_msgs=device._data_msgs, 45 data_buffers=device._data_buffers, 46 buffers=device._buffers, 47 eventer_client=eventer_client, 48 event_type_prefix=event_type_prefix) 49 50 ssl_ctx = (ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_SERVER) 51 if conf['security'] else None) 52 53 device._srv = await iec104.listen( 54 connection_cb=device._on_connection, 55 addr=tcp.Address(host=conf['local_host'], 56 port=conf['local_port']), 57 response_timeout=conf['response_timeout'], 58 supervisory_timeout=conf['supervisory_timeout'], 59 test_timeout=conf['test_timeout'], 60 send_window_size=conf['send_window_size'], 61 receive_window_size=conf['receive_window_size'], 62 ssl=ssl_ctx) 63 64 try: 65 await device._register_connections() 66 67 except BaseException: 68 await aio.uncancellable(device.async_close()) 69 raise 70 71 return device 72 73 74info: common.DeviceInfo = common.DeviceInfo( 75 type="iec104_slave", 76 create=create, 77 json_schema_id="hat-gateway://iec104.yaml#/$defs/slave", 78 json_schema_repo=common.json_schema_repo) 79 80 81class Iec104SlaveDevice(common.Device): 82 83 @property 84 def async_group(self) -> aio.Group: 85 return self._srv.async_group 86 87 async def process_events(self, events: Collection[hat.event.common.Event]): 88 for event in events: 89 try: 90 mlog.debug('received event: %s', event) 91 await self._process_event(event) 92 93 except Exception as e: 94 mlog.warning('error processing event: %s', e, exc_info=e) 95 96 async def _on_connection(self, conn): 97 if (self._max_connections is not None and 98 len(self._conns) >= self._max_connections): 99 mlog.info('max connections exceeded - rejecting connection') 100 conn.close() 101 return 102 103 if self._conf['security']: 104 try: 105 ssl.init_security(self._conf['security'], conn) 106 107 except Exception as e: 108 mlog.error('init security error: %s', exc_info=e) 109 conn.close() 110 return 111 112 conn_id = next(self._next_conn_ids) 113 114 try: 115 if self._remote_hosts is not None: 116 remote_host = conn.info.remote_addr.host 117 if remote_host not in self._remote_hosts: 118 raise Exception(f'remote host {remote_host} not allowed') 119 120 self._conns[conn_id] = conn 121 await self._register_connections() 122 123 enabled_cb = functools.partial(self._on_enabled, conn) 124 with conn.register_enabled_cb(enabled_cb): 125 enabled_cb(conn.is_enabled) 126 127 while True: 128 msgs = await conn.receive() 129 130 for msg in msgs: 131 try: 132 mlog.debug('received message: %s', msg) 133 await self._process_msg(conn_id, conn, msg) 134 135 except Exception as e: 136 mlog.warning('error processing message: %s', 137 e, exc_info=e) 138 139 except ConnectionError: 140 mlog.debug('connection close') 141 142 except Exception as e: 143 mlog.warning('connection error: %s', e, exc_info=e) 144 145 finally: 146 mlog.debug('closing connection') 147 conn.close() 148 149 with contextlib.suppress(Exception): 150 self._conns.pop(conn_id) 151 await aio.uncancellable(self._register_connections()) 152 153 def _on_enabled(self, conn, enabled): 154 if not enabled: 155 return 156 157 with contextlib.suppress(Exception): 158 for buffer in self._buffers.values(): 159 for event_id, data_msg in buffer.get_event_id_data_msgs(): 160 self._send_data_msg(conn, buffer, event_id, data_msg) 161 162 async def _register_connections(self): 163 payload = [{'connection_id': conn_id, 164 'local': {'host': conn.info.local_addr.host, 165 'port': conn.info.local_addr.port}, 166 'remote': {'host': conn.info.remote_addr.host, 167 'port': conn.info.remote_addr.port}} 168 for conn_id, conn in self._conns.items()] 169 170 event = hat.event.common.RegisterEvent( 171 type=(*self._event_type_prefix, 'gateway', 'connections'), 172 source_timestamp=None, 173 payload=hat.event.common.EventPayloadJson(payload)) 174 175 await self._eventer_client.register([event]) 176 177 async def _process_event(self, event): 178 suffix = event.type[len(self._event_type_prefix):] 179 180 if suffix[:2] == ('system', 'data'): 181 data_type_str, asdu_address_str, io_address_str = suffix[2:] 182 data_key = common.DataKey(data_type=common.DataType(data_type_str), 183 asdu_address=int(asdu_address_str), 184 io_address=int(io_address_str)) 185 186 await self._process_data_event(data_key, event) 187 188 elif suffix[:2] == ('system', 'command'): 189 cmd_type_str, asdu_address_str, io_address_str = suffix[2:] 190 cmd_key = common.CommandKey( 191 cmd_type=common.CommandType(cmd_type_str), 192 asdu_address=int(asdu_address_str), 193 io_address=int(io_address_str)) 194 195 await self._process_command_event(cmd_key, event) 196 197 else: 198 raise Exception('unsupported event type') 199 200 async def _process_data_event(self, data_key, event): 201 if data_key not in self._data_msgs: 202 raise Exception('data not configured') 203 204 data_msg = _data_msg_from_event(data_key, event) 205 self._data_msgs[data_key] = data_msg 206 207 buffer = self._data_buffers.get(data_key) 208 if buffer: 209 buffer.add(event.id, data_msg) 210 211 for conn in self._conns.values(): 212 self._send_data_msg(conn, buffer, event.id, data_msg) 213 214 async def _process_command_event(self, cmd_key, event): 215 cmd_msg = _cmd_msg_from_event(cmd_key, event) 216 conn_id = event.payload.data['connection_id'] 217 conn = self._conns[conn_id] 218 await conn.send([cmd_msg]) 219 220 async def _process_msg(self, conn_id, conn, msg): 221 if isinstance(msg, iec104.CommandMsg): 222 await self._process_command_msg(conn_id, conn, msg) 223 224 elif isinstance(msg, iec104.InterrogationMsg): 225 await self._process_interrogation_msg(conn_id, conn, msg) 226 227 elif isinstance(msg, iec104.CounterInterrogationMsg): 228 await self._process_counter_interrogation_msg(conn_id, conn, msg) 229 230 elif isinstance(msg, iec104.ReadMsg): 231 await self._process_read_msg(conn_id, conn, msg) 232 233 elif isinstance(msg, iec104.ClockSyncMsg): 234 await self._process_clock_sync_msg(conn_id, conn, msg) 235 236 elif isinstance(msg, iec104.TestMsg): 237 await self._process_test_msg(conn_id, conn, msg) 238 239 elif isinstance(msg, iec104.ResetMsg): 240 await self._process_reset_msg(conn_id, conn, msg) 241 242 elif isinstance(msg, iec104.ParameterMsg): 243 await self._process_parameter_msg(conn_id, conn, msg) 244 245 elif isinstance(msg, iec104.ParameterActivationMsg): 246 await self._process_parameter_activation_msg(conn_id, conn, msg) 247 248 else: 249 raise Exception('unsupported message') 250 251 async def _process_command_msg(self, conn_id, conn, msg): 252 if isinstance(msg.cause, iec104.CommandReqCause): 253 event = _cmd_msg_to_event(self._event_type_prefix, conn_id, msg) 254 await self._eventer_client.register([event]) 255 256 else: 257 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 258 is_negative_confirm=True) 259 await conn.send([res]) 260 261 async def _process_interrogation_msg(self, conn_id, conn, msg): 262 if msg.cause == iec104.CommandReqCause.ACTIVATION: 263 res = msg._replace( 264 cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION, 265 is_negative_confirm=False) 266 await conn.send([res]) 267 268 data_msgs = [ 269 data_msg._replace( 270 is_test=msg.is_test, 271 cause=iec104.DataResCause.INTERROGATED_STATION) 272 for data_msg in self._data_msgs.values() 273 if (data_msg and 274 (msg.asdu_address == 0xFFFF or 275 msg.asdu_address == data_msg.asdu_address) and 276 not isinstance(data_msg.data, iec104.BinaryCounterData))] 277 await conn.send(data_msgs) 278 279 res = msg._replace( 280 cause=iec104.CommandResCause.ACTIVATION_TERMINATION, 281 is_negative_confirm=False) 282 await conn.send([res]) 283 284 elif msg.cause == iec104.CommandReqCause.DEACTIVATION: 285 res = msg._replace( 286 cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION, 287 is_negative_confirm=True) 288 await conn.send([res]) 289 290 else: 291 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 292 is_negative_confirm=True) 293 await conn.send([res]) 294 295 async def _process_counter_interrogation_msg(self, conn_id, conn, msg): 296 if msg.cause == iec104.CommandReqCause.ACTIVATION: 297 res = msg._replace( 298 cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION, 299 is_negative_confirm=False) 300 await conn.send([res]) 301 302 data_msgs = [ 303 data_msg._replace( 304 is_test=msg.is_test, 305 cause=iec104.DataResCause.INTERROGATED_COUNTER) 306 for data_msg in self._data_msgs.values() 307 if (data_msg and 308 (msg.asdu_address == 0xFFFF or 309 msg.asdu_address == data_msg.asdu_address) and 310 isinstance(data_msg.data, iec104.BinaryCounterData))] 311 await conn.send(data_msgs) 312 313 res = msg._replace( 314 cause=iec104.CommandResCause.ACTIVATION_TERMINATION, 315 is_negative_confirm=False) 316 await conn.send([res]) 317 318 elif msg.cause == iec104.CommandReqCause.DEACTIVATION: 319 res = msg._replace( 320 cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION, 321 is_negative_confirm=True) 322 await conn.send([res]) 323 324 else: 325 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 326 is_negative_confirm=True) 327 await conn.send([res]) 328 329 async def _process_read_msg(self, conn_id, conn, msg): 330 res = msg._replace(cause=iec104.ReadResCause.UNKNOWN_TYPE) 331 await conn.send([res]) 332 333 async def _process_clock_sync_msg(self, conn_id, conn, msg): 334 if isinstance(msg.cause, iec104.ClockSyncReqCause): 335 res = msg._replace( 336 cause=iec104.ClockSyncResCause.ACTIVATION_CONFIRMATION, 337 is_negative_confirm=True) 338 await conn.send([res]) 339 340 else: 341 res = msg._replace(cause=iec104.ClockSyncResCause.UNKNOWN_CAUSE, 342 is_negative_confirm=True) 343 await conn.send([res]) 344 345 async def _process_test_msg(self, conn_id, conn, msg): 346 res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE) 347 await conn.send([res]) 348 349 async def _process_reset_msg(self, conn_id, conn, msg): 350 res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE) 351 await conn.send([res]) 352 353 async def _process_parameter_msg(self, conn_id, conn, msg): 354 res = msg._replace(cause=iec104.ParameterResCause.UNKNOWN_TYPE) 355 await conn.send([res]) 356 357 async def _process_parameter_activation_msg(self, conn_id, conn, msg): 358 res = msg._replace( 359 cause=iec104.ParameterActivationResCause.UNKNOWN_TYPE) 360 await conn.send([res]) 361 362 def _send_data_msg(self, conn, buffer, event_id, data_msg): 363 self.async_group.spawn(_send_data_msg, conn, buffer, event_id, 364 data_msg) 365 366 367async def _send_data_msg(conn, buffer, event_id, data_msg): 368 try: 369 if buffer: 370 await conn.send([data_msg], wait_ack=True) 371 buffer.remove(event_id) 372 373 else: 374 await conn.send([data_msg]) 375 376 except ConnectionError: 377 pass 378 379 except Exception as e: 380 mlog.warning('send data message error: %s', e, exc_info=e) 381 382 383def _cmd_msg_to_event(event_type_prefix, conn_id, msg): 384 event = iec101_slave.cmd_msg_to_event(event_type_prefix, conn_id, msg) 385 source_timestamp = common.time_to_source_timestamp(msg.time) 386 return event._replace(source_timestamp=source_timestamp) 387 388 389def _data_msg_from_event(data_key, event): 390 return iec101_slave.data_msg_from_event(data_key, event) 391 392 393def _cmd_msg_from_event(cmd_key, event): 394 msg = iec101_slave.cmd_msg_from_event(cmd_key, event) 395 time = common.time_from_source_timestamp(event.source_timestamp) 396 return iec104.CommandMsg(**msg._asdict(), 397 time=time)
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec104SlaveDevice:
24async def create(conf: common.DeviceConf, 25 eventer_client: hat.event.eventer.Client, 26 event_type_prefix: common.EventTypePrefix 27 ) -> 'Iec104SlaveDevice': 28 device = Iec104SlaveDevice() 29 device._conf = conf 30 device._eventer_client = eventer_client 31 device._event_type_prefix = event_type_prefix 32 device._max_connections = conf['max_connections'] 33 device._next_conn_ids = itertools.count(1) 34 device._conns = {} 35 device._remote_hosts = (set(conf['remote_hosts']) 36 if conf['remote_hosts'] is not None else None) 37 device._buffers = {} 38 device._data_msgs = {} 39 device._data_buffers = {} 40 41 iec101_slave.init_buffers(buffers_conf=conf['buffers'], 42 buffers=device._buffers) 43 44 await iec101_slave.init_data(data_conf=conf['data'], 45 data_msgs=device._data_msgs, 46 data_buffers=device._data_buffers, 47 buffers=device._buffers, 48 eventer_client=eventer_client, 49 event_type_prefix=event_type_prefix) 50 51 ssl_ctx = (ssl.create_ssl_ctx(conf['security'], ssl.SslProtocol.TLS_SERVER) 52 if conf['security'] else None) 53 54 device._srv = await iec104.listen( 55 connection_cb=device._on_connection, 56 addr=tcp.Address(host=conf['local_host'], 57 port=conf['local_port']), 58 response_timeout=conf['response_timeout'], 59 supervisory_timeout=conf['supervisory_timeout'], 60 test_timeout=conf['test_timeout'], 61 send_window_size=conf['send_window_size'], 62 receive_window_size=conf['receive_window_size'], 63 ssl=ssl_ctx) 64 65 try: 66 await device._register_connections() 67 68 except BaseException: 69 await aio.uncancellable(device.async_close()) 70 raise 71 72 return device
info: hat.gateway.common.DeviceInfo =
DeviceInfo(type='iec104_slave', create=<function create>, json_schema_id='hat-gateway://iec104.yaml#/$defs/slave', json_schema_repo=<hat.json.repository.SchemaRepository object>)
82class Iec104SlaveDevice(common.Device): 83 84 @property 85 def async_group(self) -> aio.Group: 86 return self._srv.async_group 87 88 async def process_events(self, events: Collection[hat.event.common.Event]): 89 for event in events: 90 try: 91 mlog.debug('received event: %s', event) 92 await self._process_event(event) 93 94 except Exception as e: 95 mlog.warning('error processing event: %s', e, exc_info=e) 96 97 async def _on_connection(self, conn): 98 if (self._max_connections is not None and 99 len(self._conns) >= self._max_connections): 100 mlog.info('max connections exceeded - rejecting connection') 101 conn.close() 102 return 103 104 if self._conf['security']: 105 try: 106 ssl.init_security(self._conf['security'], conn) 107 108 except Exception as e: 109 mlog.error('init security error: %s', exc_info=e) 110 conn.close() 111 return 112 113 conn_id = next(self._next_conn_ids) 114 115 try: 116 if self._remote_hosts is not None: 117 remote_host = conn.info.remote_addr.host 118 if remote_host not in self._remote_hosts: 119 raise Exception(f'remote host {remote_host} not allowed') 120 121 self._conns[conn_id] = conn 122 await self._register_connections() 123 124 enabled_cb = functools.partial(self._on_enabled, conn) 125 with conn.register_enabled_cb(enabled_cb): 126 enabled_cb(conn.is_enabled) 127 128 while True: 129 msgs = await conn.receive() 130 131 for msg in msgs: 132 try: 133 mlog.debug('received message: %s', msg) 134 await self._process_msg(conn_id, conn, msg) 135 136 except Exception as e: 137 mlog.warning('error processing message: %s', 138 e, exc_info=e) 139 140 except ConnectionError: 141 mlog.debug('connection close') 142 143 except Exception as e: 144 mlog.warning('connection error: %s', e, exc_info=e) 145 146 finally: 147 mlog.debug('closing connection') 148 conn.close() 149 150 with contextlib.suppress(Exception): 151 self._conns.pop(conn_id) 152 await aio.uncancellable(self._register_connections()) 153 154 def _on_enabled(self, conn, enabled): 155 if not enabled: 156 return 157 158 with contextlib.suppress(Exception): 159 for buffer in self._buffers.values(): 160 for event_id, data_msg in buffer.get_event_id_data_msgs(): 161 self._send_data_msg(conn, buffer, event_id, data_msg) 162 163 async def _register_connections(self): 164 payload = [{'connection_id': conn_id, 165 'local': {'host': conn.info.local_addr.host, 166 'port': conn.info.local_addr.port}, 167 'remote': {'host': conn.info.remote_addr.host, 168 'port': conn.info.remote_addr.port}} 169 for conn_id, conn in self._conns.items()] 170 171 event = hat.event.common.RegisterEvent( 172 type=(*self._event_type_prefix, 'gateway', 'connections'), 173 source_timestamp=None, 174 payload=hat.event.common.EventPayloadJson(payload)) 175 176 await self._eventer_client.register([event]) 177 178 async def _process_event(self, event): 179 suffix = event.type[len(self._event_type_prefix):] 180 181 if suffix[:2] == ('system', 'data'): 182 data_type_str, asdu_address_str, io_address_str = suffix[2:] 183 data_key = common.DataKey(data_type=common.DataType(data_type_str), 184 asdu_address=int(asdu_address_str), 185 io_address=int(io_address_str)) 186 187 await self._process_data_event(data_key, event) 188 189 elif suffix[:2] == ('system', 'command'): 190 cmd_type_str, asdu_address_str, io_address_str = suffix[2:] 191 cmd_key = common.CommandKey( 192 cmd_type=common.CommandType(cmd_type_str), 193 asdu_address=int(asdu_address_str), 194 io_address=int(io_address_str)) 195 196 await self._process_command_event(cmd_key, event) 197 198 else: 199 raise Exception('unsupported event type') 200 201 async def _process_data_event(self, data_key, event): 202 if data_key not in self._data_msgs: 203 raise Exception('data not configured') 204 205 data_msg = _data_msg_from_event(data_key, event) 206 self._data_msgs[data_key] = data_msg 207 208 buffer = self._data_buffers.get(data_key) 209 if buffer: 210 buffer.add(event.id, data_msg) 211 212 for conn in self._conns.values(): 213 self._send_data_msg(conn, buffer, event.id, data_msg) 214 215 async def _process_command_event(self, cmd_key, event): 216 cmd_msg = _cmd_msg_from_event(cmd_key, event) 217 conn_id = event.payload.data['connection_id'] 218 conn = self._conns[conn_id] 219 await conn.send([cmd_msg]) 220 221 async def _process_msg(self, conn_id, conn, msg): 222 if isinstance(msg, iec104.CommandMsg): 223 await self._process_command_msg(conn_id, conn, msg) 224 225 elif isinstance(msg, iec104.InterrogationMsg): 226 await self._process_interrogation_msg(conn_id, conn, msg) 227 228 elif isinstance(msg, iec104.CounterInterrogationMsg): 229 await self._process_counter_interrogation_msg(conn_id, conn, msg) 230 231 elif isinstance(msg, iec104.ReadMsg): 232 await self._process_read_msg(conn_id, conn, msg) 233 234 elif isinstance(msg, iec104.ClockSyncMsg): 235 await self._process_clock_sync_msg(conn_id, conn, msg) 236 237 elif isinstance(msg, iec104.TestMsg): 238 await self._process_test_msg(conn_id, conn, msg) 239 240 elif isinstance(msg, iec104.ResetMsg): 241 await self._process_reset_msg(conn_id, conn, msg) 242 243 elif isinstance(msg, iec104.ParameterMsg): 244 await self._process_parameter_msg(conn_id, conn, msg) 245 246 elif isinstance(msg, iec104.ParameterActivationMsg): 247 await self._process_parameter_activation_msg(conn_id, conn, msg) 248 249 else: 250 raise Exception('unsupported message') 251 252 async def _process_command_msg(self, conn_id, conn, msg): 253 if isinstance(msg.cause, iec104.CommandReqCause): 254 event = _cmd_msg_to_event(self._event_type_prefix, conn_id, msg) 255 await self._eventer_client.register([event]) 256 257 else: 258 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 259 is_negative_confirm=True) 260 await conn.send([res]) 261 262 async def _process_interrogation_msg(self, conn_id, conn, msg): 263 if msg.cause == iec104.CommandReqCause.ACTIVATION: 264 res = msg._replace( 265 cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION, 266 is_negative_confirm=False) 267 await conn.send([res]) 268 269 data_msgs = [ 270 data_msg._replace( 271 is_test=msg.is_test, 272 cause=iec104.DataResCause.INTERROGATED_STATION) 273 for data_msg in self._data_msgs.values() 274 if (data_msg and 275 (msg.asdu_address == 0xFFFF or 276 msg.asdu_address == data_msg.asdu_address) and 277 not isinstance(data_msg.data, iec104.BinaryCounterData))] 278 await conn.send(data_msgs) 279 280 res = msg._replace( 281 cause=iec104.CommandResCause.ACTIVATION_TERMINATION, 282 is_negative_confirm=False) 283 await conn.send([res]) 284 285 elif msg.cause == iec104.CommandReqCause.DEACTIVATION: 286 res = msg._replace( 287 cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION, 288 is_negative_confirm=True) 289 await conn.send([res]) 290 291 else: 292 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 293 is_negative_confirm=True) 294 await conn.send([res]) 295 296 async def _process_counter_interrogation_msg(self, conn_id, conn, msg): 297 if msg.cause == iec104.CommandReqCause.ACTIVATION: 298 res = msg._replace( 299 cause=iec104.CommandResCause.ACTIVATION_CONFIRMATION, 300 is_negative_confirm=False) 301 await conn.send([res]) 302 303 data_msgs = [ 304 data_msg._replace( 305 is_test=msg.is_test, 306 cause=iec104.DataResCause.INTERROGATED_COUNTER) 307 for data_msg in self._data_msgs.values() 308 if (data_msg and 309 (msg.asdu_address == 0xFFFF or 310 msg.asdu_address == data_msg.asdu_address) and 311 isinstance(data_msg.data, iec104.BinaryCounterData))] 312 await conn.send(data_msgs) 313 314 res = msg._replace( 315 cause=iec104.CommandResCause.ACTIVATION_TERMINATION, 316 is_negative_confirm=False) 317 await conn.send([res]) 318 319 elif msg.cause == iec104.CommandReqCause.DEACTIVATION: 320 res = msg._replace( 321 cause=iec104.CommandResCause.DEACTIVATION_CONFIRMATION, 322 is_negative_confirm=True) 323 await conn.send([res]) 324 325 else: 326 res = msg._replace(cause=iec104.CommandResCause.UNKNOWN_CAUSE, 327 is_negative_confirm=True) 328 await conn.send([res]) 329 330 async def _process_read_msg(self, conn_id, conn, msg): 331 res = msg._replace(cause=iec104.ReadResCause.UNKNOWN_TYPE) 332 await conn.send([res]) 333 334 async def _process_clock_sync_msg(self, conn_id, conn, msg): 335 if isinstance(msg.cause, iec104.ClockSyncReqCause): 336 res = msg._replace( 337 cause=iec104.ClockSyncResCause.ACTIVATION_CONFIRMATION, 338 is_negative_confirm=True) 339 await conn.send([res]) 340 341 else: 342 res = msg._replace(cause=iec104.ClockSyncResCause.UNKNOWN_CAUSE, 343 is_negative_confirm=True) 344 await conn.send([res]) 345 346 async def _process_test_msg(self, conn_id, conn, msg): 347 res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE) 348 await conn.send([res]) 349 350 async def _process_reset_msg(self, conn_id, conn, msg): 351 res = msg._replace(cause=iec104.ActivationResCause.UNKNOWN_TYPE) 352 await conn.send([res]) 353 354 async def _process_parameter_msg(self, conn_id, conn, msg): 355 res = msg._replace(cause=iec104.ParameterResCause.UNKNOWN_TYPE) 356 await conn.send([res]) 357 358 async def _process_parameter_activation_msg(self, conn_id, conn, msg): 359 res = msg._replace( 360 cause=iec104.ParameterActivationResCause.UNKNOWN_TYPE) 361 await conn.send([res]) 362 363 def _send_data_msg(self, conn, buffer, event_id, data_msg): 364 self.async_group.spawn(_send_data_msg, conn, buffer, event_id, 365 data_msg)
Device interface
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
88 async def process_events(self, events: Collection[hat.event.common.Event]): 89 for event in events: 90 try: 91 mlog.debug('received event: %s', event) 92 await self._process_event(event) 93 94 except Exception as e: 95 mlog.warning('error processing event: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.