hat.gateway.devices.iec103.master
IEC 60870-5-103 master device
1"""IEC 60870-5-103 master device""" 2 3from collections.abc import Collection 4import asyncio 5import collections 6import contextlib 7import datetime 8import enum 9import functools 10import logging 11 12from hat import aio 13from hat.drivers import iec103 14from hat.drivers import serial 15from hat.drivers.iec60870 import link 16import hat.event.common 17import hat.event.eventer 18 19from hat.gateway import common 20 21 22mlog: logging.Logger = logging.getLogger(__name__) 23 24command_timeout: float = 100 25 26interrogate_timeout: float = 100 27 28 29async def create(conf: common.DeviceConf, 30 eventer_client: hat.event.eventer.Client, 31 event_type_prefix: common.EventTypePrefix 32 ) -> 'Iec103MasterDevice': 33 event_types = [(*event_type_prefix, 'system', 'remote_device', 34 str(i['address']), 'enable') 35 for i in conf['remote_devices']] 36 params = hat.event.common.QueryLatestParams(event_types) 37 result = await eventer_client.query(params) 38 39 device = Iec103MasterDevice(conf=conf, 40 eventer_client=eventer_client, 41 event_type_prefix=event_type_prefix) 42 try: 43 await device.process_events(result.events) 44 45 except BaseException: 46 await aio.uncancellable(device.async_close()) 47 raise 48 49 return device 50 51 52info: common.DeviceInfo = common.DeviceInfo( 53 type="iec103_master", 54 create=create, 55 json_schema_id="hat-gateway://iec103.yaml#/$defs/master", 56 json_schema_repo=common.json_schema_repo) 57 58 59class Iec103MasterDevice(common.Device): 60 61 def __init__(self, 62 conf: common.DeviceConf, 63 eventer_client: hat.event.eventer.Client, 64 event_type_prefix: common.EventTypePrefix): 65 self._conf = conf 66 self._eventer_client = eventer_client 67 self._event_type_prefix = event_type_prefix 68 self._master = None 69 self._conns = {} 70 self._remote_enabled = {i['address']: False 71 for i in conf['remote_devices']} 72 self._remote_confs = {i['address']: i 73 for i in conf['remote_devices']} 74 self._remote_groups = {} 75 self._async_group = aio.Group() 76 77 self.async_group.spawn(self._create_link_master_loop) 78 79 @property 80 def async_group(self) -> aio.Group: 81 return self._async_group 82 83 async def process_events(self, events: Collection[hat.event.common.Event]): 84 for event in events: 85 try: 86 await self._process_event(event) 87 88 except Exception as e: 89 mlog.warning('error processing event: %s', e, exc_info=e) 90 91 async def _create_link_master_loop(self): 92 93 async def cleanup(): 94 with contextlib.suppress(ConnectionError): 95 await self._register_status('DISCONNECTED') 96 97 if self._master: 98 await self._master.async_close() 99 100 try: 101 while True: 102 await self._register_status('CONNECTING') 103 104 try: 105 self._master = await link.unbalanced.create_master( 106 port=self._conf['port'], 107 baudrate=self._conf['baudrate'], 108 bytesize=serial.ByteSize[self._conf['bytesize']], 109 parity=serial.Parity[self._conf['parity']], 110 stopbits=serial.StopBits[self._conf['stopbits']], 111 xonxoff=self._conf['flow_control']['xonxoff'], 112 rtscts=self._conf['flow_control']['rtscts'], 113 dsrdtr=self._conf['flow_control']['dsrdtr'], 114 silent_interval=self._conf['silent_interval'], 115 address_size=link.AddressSize.ONE) 116 117 except Exception as e: 118 mlog.warning('link master (endpoint) failed to create: %s', 119 e, exc_info=e) 120 await self._register_status('DISCONNECTED') 121 await asyncio.sleep(self._conf['reconnect_delay']) 122 continue 123 124 await self._register_status('CONNECTED') 125 for address, enabled in self._remote_enabled.items(): 126 if enabled: 127 self._enable_remote(address) 128 129 await self._master.wait_closed() 130 await self._register_status('DISCONNECTED') 131 self._master = None 132 133 finally: 134 mlog.debug('closing link master loop') 135 self.close() 136 await aio.uncancellable(cleanup()) 137 138 async def _connection_loop(self, group, address): 139 140 async def cleanup(): 141 with contextlib.suppress(ConnectionError): 142 await self._register_rmt_status(address, 'DISCONNECTED') 143 144 conn = self._conns.pop(address, None) 145 if conn: 146 await conn.async_close() 147 148 remote_conf = self._remote_confs[address] 149 try: 150 while True: 151 await self._register_rmt_status(address, 'CONNECTING') 152 153 try: 154 conn_link = await self._master.connect( 155 addr=address, 156 response_timeout=remote_conf['response_timeout'], 157 send_retry_count=remote_conf['send_retry_count'], 158 poll_class1_delay=remote_conf['poll_class1_delay'], 159 poll_class2_delay=remote_conf['poll_class2_delay']) 160 161 except Exception as e: 162 mlog.error('connection error to address %s: %s', 163 address, e, exc_info=e) 164 await self._register_rmt_status(address, 'DISCONNECTED') 165 await asyncio.sleep(remote_conf['reconnect_delay']) 166 continue 167 168 await self._register_rmt_status(address, 'CONNECTED') 169 conn = iec103.MasterConnection( 170 conn=conn_link, 171 data_cb=functools.partial(self._on_data, address), 172 generic_data_cb=None) 173 self._conns[address] = conn 174 if remote_conf['time_sync_delay'] is not None: 175 group.spawn(self._time_sync_loop, conn, 176 remote_conf['time_sync_delay']) 177 178 await conn.wait_closed() 179 await self._register_rmt_status(address, 'DISCONNECTED') 180 self._conns.pop(address) 181 182 finally: 183 mlog.debug('closing remote device %s', address) 184 group.close() 185 await aio.uncancellable(cleanup()) 186 187 async def _time_sync_loop(self, conn, delay): 188 try: 189 while True: 190 await conn.time_sync() 191 mlog.debug('time sync') 192 await asyncio.sleep(delay) 193 194 except ConnectionError: 195 mlog.debug('connection closed') 196 197 finally: 198 conn.close() 199 200 async def _on_data(self, address, data): 201 events = collections.deque() 202 try: 203 for event in _events_from_data(data, address, 204 self._event_type_prefix): 205 events.append(event) 206 207 except Exception as e: 208 mlog.warning('data %s ignored due to: %s', data, e, exc_info=e) 209 210 if events: 211 await self._eventer_client.register(events) 212 213 async def _process_event(self, event): 214 prefix_len = len(self._event_type_prefix) 215 if event.type[prefix_len + 1] != 'remote_device': 216 raise Exception('unexpected event type') 217 218 address = self._address_from_event(event) 219 etype_suffix = event.type[prefix_len + 3:] 220 221 if etype_suffix[0] == 'enable': 222 self._process_enable(event) 223 224 elif etype_suffix[0] == 'command': 225 asdu = int(etype_suffix[1]) 226 io = iec103.IoAddress( 227 function_type=int(etype_suffix[2]), 228 information_number=int(etype_suffix[3])) 229 self._process_command(event, address, asdu, io) 230 231 elif etype_suffix[0] == 'interrogation': 232 asdu = int(etype_suffix[1]) 233 self._process_interrogation(event, address, asdu) 234 235 else: 236 raise Exception('unexpected event type') 237 238 def _process_enable(self, event): 239 address = self._address_from_event(event) 240 enable = event.payload.data 241 if address not in self._remote_enabled: 242 raise Exception('invalid remote device address') 243 244 if not isinstance(enable, bool): 245 raise Exception('invalid enable event payload') 246 247 self._remote_enabled[address] = enable 248 249 if not self._master: 250 return 251 252 if enable: 253 self._enable_remote(address) 254 255 else: 256 self._disable_remote(address) 257 258 def _enable_remote(self, address): 259 remote_group = self._remote_groups.get(address) 260 if remote_group and remote_group.is_open: 261 return 262 263 remote_group = self._async_group.create_subgroup() 264 self._remote_groups[address] = remote_group 265 remote_group.spawn(self._connection_loop, remote_group, address) 266 267 def _disable_remote(self, address): 268 if address in self._remote_groups: 269 remote_group = self._remote_groups.pop(address) 270 remote_group.close() 271 272 def _process_command(self, event, address, asdu, io): 273 conn = self._conns.get(address) 274 if not conn or not conn.is_open: 275 raise Exception('connection closed') 276 277 value = iec103.DoubleValue[event.payload.data['value']] 278 session_id = event.payload.data['session_id'] 279 self._remote_groups[address].spawn( 280 self._cmd_req_res, conn, address, asdu, io, value, session_id) 281 282 async def _cmd_req_res(self, conn, address, asdu, io, value, session_id): 283 try: 284 success = await asyncio.wait_for( 285 conn.send_command(asdu, io, value), timeout=command_timeout) 286 287 except ConnectionError: 288 mlog.warning('command %s %s %s to %s failed: connection closed', 289 asdu, io, value, address) 290 return 291 292 except asyncio.TimeoutError: 293 mlog.warning( 294 'command %s %s %s to %s timeout', asdu, io, value, address) 295 return 296 297 event = _create_event( 298 event_type=(*self._event_type_prefix, 'gateway', 'remote_device', 299 str(address), 'command', str(asdu), 300 str(io.function_type), str(io.information_number)), 301 payload={'success': success, 302 'session_id': session_id}) 303 304 await self._eventer_client.register([event]) 305 306 def _process_interrogation(self, event, address, asdu): 307 conn = self._conns.get(address) 308 if not conn or not conn.is_open: 309 mlog.warning("event %s ignored due to connection closed", event) 310 return 311 312 self._remote_groups[address].spawn( 313 self._interrogate_req_res, conn, address, asdu) 314 315 async def _interrogate_req_res(self, conn, address, asdu): 316 try: 317 await asyncio.wait_for(conn.interrogate(asdu), 318 timeout=interrogate_timeout) 319 320 except ConnectionError: 321 mlog.warning('interrogation on %s to %s failed: connection closed', 322 asdu, address) 323 return 324 325 except asyncio.TimeoutError: 326 mlog.warning('interrogation on %s to %s timeout', asdu, address) 327 return 328 329 event = _create_event( 330 event_type=(*self._event_type_prefix, 'gateway', 'remote_device', 331 str(address), 'interrogation', str(asdu)), 332 payload=None) 333 334 await self._eventer_client.register([event]) 335 336 async def _register_status(self, status): 337 event = _create_event( 338 event_type=(*self._event_type_prefix, 'gateway', 'status'), 339 payload=status) 340 341 await self._eventer_client.register([event]) 342 343 async def _register_rmt_status(self, address, status): 344 event = _create_event( 345 event_type=(*self._event_type_prefix, 346 'gateway', 'remote_device', str(address), 'status'), 347 payload=status) 348 349 await self._eventer_client.register([event]) 350 351 def _address_from_event(self, event): 352 return int(event.type[len(self._event_type_prefix) + 2]) 353 354 355def _events_from_data(data, address, event_type_prefix): 356 cause = (data.cause.name if isinstance(data.cause, enum.Enum) 357 else data.cause) 358 359 if isinstance(data.value, (iec103.DoubleWithTimeValue, 360 iec103.DoubleWithRelativeTimeValue)): 361 data_type = 'double' 362 payload = {'cause': cause, 363 'value': data.value.value.name} 364 source_ts = _time_iec103_to_source_ts(data.value.time) 365 event_type = _data_event_type( 366 data, address, data_type, event_type_prefix) 367 yield _create_event(event_type, payload, source_ts) 368 369 elif isinstance(data.value, iec103.MeasurandValues): 370 for meas_type, meas_value in data.value.values.items(): 371 payload = {'cause': cause, 372 'value': meas_value._asdict()} 373 data_type = meas_type.name.lower() 374 event_type = _data_event_type( 375 data, address, data_type, event_type_prefix) 376 yield _create_event(event_type, payload) 377 378 else: 379 raise Exception('unsupported data value') 380 381 382def _data_event_type(data, address, data_type, event_type_prefix): 383 return ( 384 *event_type_prefix, 'gateway', 'remote_device', str(address), 385 'data', data_type, 386 str(data.asdu_address), 387 str(data.io_address.function_type), 388 str(data.io_address.information_number)) 389 390 391def _time_iec103_to_source_ts(time_four): 392 t_now = datetime.datetime.now(datetime.timezone.utc) 393 candidates_now = [t_now - datetime.timedelta(hours=12), 394 t_now, 395 t_now + datetime.timedelta(hours=12)] 396 candidates_103 = [_upgrade_time_four_to_seven( 397 time_four, iec103.time_from_datetime(t)) 398 for t in candidates_now] 399 candidates_dt = [iec103.time_to_datetime(t) 400 for t in candidates_103 if t] 401 if not candidates_dt: 402 return 403 404 res = min(candidates_dt, key=lambda i: abs(t_now - i)) 405 return hat.event.common.timestamp_from_datetime(res) 406 407 408def _upgrade_time_four_to_seven(time_four, time_seven): 409 if time_four.summer_time != time_seven.summer_time: 410 return 411 412 return time_four._replace( 413 day_of_week=time_seven.day_of_week, 414 day_of_month=time_seven.day_of_month, 415 months=time_seven.months, 416 years=time_seven.years, 417 size=iec103.TimeSize.SEVEN) 418 419 420def _create_event(event_type, payload, source_timestamp=None): 421 return hat.event.common.RegisterEvent( 422 type=event_type, 423 source_timestamp=source_timestamp, 424 payload=hat.event.common.EventPayloadJson(payload))
command_timeout: float =
100
interrogate_timeout: float =
100
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str]) -> Iec103MasterDevice:
30async def create(conf: common.DeviceConf, 31 eventer_client: hat.event.eventer.Client, 32 event_type_prefix: common.EventTypePrefix 33 ) -> 'Iec103MasterDevice': 34 event_types = [(*event_type_prefix, 'system', 'remote_device', 35 str(i['address']), 'enable') 36 for i in conf['remote_devices']] 37 params = hat.event.common.QueryLatestParams(event_types) 38 result = await eventer_client.query(params) 39 40 device = Iec103MasterDevice(conf=conf, 41 eventer_client=eventer_client, 42 event_type_prefix=event_type_prefix) 43 try: 44 await device.process_events(result.events) 45 46 except BaseException: 47 await aio.uncancellable(device.async_close()) 48 raise 49 50 return device
info: hat.gateway.common.DeviceInfo =
DeviceInfo(type='iec103_master', create=<function create>, json_schema_id='hat-gateway://iec103.yaml#/$defs/master', json_schema_repo=<hat.json.repository.SchemaRepository object>)
60class Iec103MasterDevice(common.Device): 61 62 def __init__(self, 63 conf: common.DeviceConf, 64 eventer_client: hat.event.eventer.Client, 65 event_type_prefix: common.EventTypePrefix): 66 self._conf = conf 67 self._eventer_client = eventer_client 68 self._event_type_prefix = event_type_prefix 69 self._master = None 70 self._conns = {} 71 self._remote_enabled = {i['address']: False 72 for i in conf['remote_devices']} 73 self._remote_confs = {i['address']: i 74 for i in conf['remote_devices']} 75 self._remote_groups = {} 76 self._async_group = aio.Group() 77 78 self.async_group.spawn(self._create_link_master_loop) 79 80 @property 81 def async_group(self) -> aio.Group: 82 return self._async_group 83 84 async def process_events(self, events: Collection[hat.event.common.Event]): 85 for event in events: 86 try: 87 await self._process_event(event) 88 89 except Exception as e: 90 mlog.warning('error processing event: %s', e, exc_info=e) 91 92 async def _create_link_master_loop(self): 93 94 async def cleanup(): 95 with contextlib.suppress(ConnectionError): 96 await self._register_status('DISCONNECTED') 97 98 if self._master: 99 await self._master.async_close() 100 101 try: 102 while True: 103 await self._register_status('CONNECTING') 104 105 try: 106 self._master = await link.unbalanced.create_master( 107 port=self._conf['port'], 108 baudrate=self._conf['baudrate'], 109 bytesize=serial.ByteSize[self._conf['bytesize']], 110 parity=serial.Parity[self._conf['parity']], 111 stopbits=serial.StopBits[self._conf['stopbits']], 112 xonxoff=self._conf['flow_control']['xonxoff'], 113 rtscts=self._conf['flow_control']['rtscts'], 114 dsrdtr=self._conf['flow_control']['dsrdtr'], 115 silent_interval=self._conf['silent_interval'], 116 address_size=link.AddressSize.ONE) 117 118 except Exception as e: 119 mlog.warning('link master (endpoint) failed to create: %s', 120 e, exc_info=e) 121 await self._register_status('DISCONNECTED') 122 await asyncio.sleep(self._conf['reconnect_delay']) 123 continue 124 125 await self._register_status('CONNECTED') 126 for address, enabled in self._remote_enabled.items(): 127 if enabled: 128 self._enable_remote(address) 129 130 await self._master.wait_closed() 131 await self._register_status('DISCONNECTED') 132 self._master = None 133 134 finally: 135 mlog.debug('closing link master loop') 136 self.close() 137 await aio.uncancellable(cleanup()) 138 139 async def _connection_loop(self, group, address): 140 141 async def cleanup(): 142 with contextlib.suppress(ConnectionError): 143 await self._register_rmt_status(address, 'DISCONNECTED') 144 145 conn = self._conns.pop(address, None) 146 if conn: 147 await conn.async_close() 148 149 remote_conf = self._remote_confs[address] 150 try: 151 while True: 152 await self._register_rmt_status(address, 'CONNECTING') 153 154 try: 155 conn_link = await self._master.connect( 156 addr=address, 157 response_timeout=remote_conf['response_timeout'], 158 send_retry_count=remote_conf['send_retry_count'], 159 poll_class1_delay=remote_conf['poll_class1_delay'], 160 poll_class2_delay=remote_conf['poll_class2_delay']) 161 162 except Exception as e: 163 mlog.error('connection error to address %s: %s', 164 address, e, exc_info=e) 165 await self._register_rmt_status(address, 'DISCONNECTED') 166 await asyncio.sleep(remote_conf['reconnect_delay']) 167 continue 168 169 await self._register_rmt_status(address, 'CONNECTED') 170 conn = iec103.MasterConnection( 171 conn=conn_link, 172 data_cb=functools.partial(self._on_data, address), 173 generic_data_cb=None) 174 self._conns[address] = conn 175 if remote_conf['time_sync_delay'] is not None: 176 group.spawn(self._time_sync_loop, conn, 177 remote_conf['time_sync_delay']) 178 179 await conn.wait_closed() 180 await self._register_rmt_status(address, 'DISCONNECTED') 181 self._conns.pop(address) 182 183 finally: 184 mlog.debug('closing remote device %s', address) 185 group.close() 186 await aio.uncancellable(cleanup()) 187 188 async def _time_sync_loop(self, conn, delay): 189 try: 190 while True: 191 await conn.time_sync() 192 mlog.debug('time sync') 193 await asyncio.sleep(delay) 194 195 except ConnectionError: 196 mlog.debug('connection closed') 197 198 finally: 199 conn.close() 200 201 async def _on_data(self, address, data): 202 events = collections.deque() 203 try: 204 for event in _events_from_data(data, address, 205 self._event_type_prefix): 206 events.append(event) 207 208 except Exception as e: 209 mlog.warning('data %s ignored due to: %s', data, e, exc_info=e) 210 211 if events: 212 await self._eventer_client.register(events) 213 214 async def _process_event(self, event): 215 prefix_len = len(self._event_type_prefix) 216 if event.type[prefix_len + 1] != 'remote_device': 217 raise Exception('unexpected event type') 218 219 address = self._address_from_event(event) 220 etype_suffix = event.type[prefix_len + 3:] 221 222 if etype_suffix[0] == 'enable': 223 self._process_enable(event) 224 225 elif etype_suffix[0] == 'command': 226 asdu = int(etype_suffix[1]) 227 io = iec103.IoAddress( 228 function_type=int(etype_suffix[2]), 229 information_number=int(etype_suffix[3])) 230 self._process_command(event, address, asdu, io) 231 232 elif etype_suffix[0] == 'interrogation': 233 asdu = int(etype_suffix[1]) 234 self._process_interrogation(event, address, asdu) 235 236 else: 237 raise Exception('unexpected event type') 238 239 def _process_enable(self, event): 240 address = self._address_from_event(event) 241 enable = event.payload.data 242 if address not in self._remote_enabled: 243 raise Exception('invalid remote device address') 244 245 if not isinstance(enable, bool): 246 raise Exception('invalid enable event payload') 247 248 self._remote_enabled[address] = enable 249 250 if not self._master: 251 return 252 253 if enable: 254 self._enable_remote(address) 255 256 else: 257 self._disable_remote(address) 258 259 def _enable_remote(self, address): 260 remote_group = self._remote_groups.get(address) 261 if remote_group and remote_group.is_open: 262 return 263 264 remote_group = self._async_group.create_subgroup() 265 self._remote_groups[address] = remote_group 266 remote_group.spawn(self._connection_loop, remote_group, address) 267 268 def _disable_remote(self, address): 269 if address in self._remote_groups: 270 remote_group = self._remote_groups.pop(address) 271 remote_group.close() 272 273 def _process_command(self, event, address, asdu, io): 274 conn = self._conns.get(address) 275 if not conn or not conn.is_open: 276 raise Exception('connection closed') 277 278 value = iec103.DoubleValue[event.payload.data['value']] 279 session_id = event.payload.data['session_id'] 280 self._remote_groups[address].spawn( 281 self._cmd_req_res, conn, address, asdu, io, value, session_id) 282 283 async def _cmd_req_res(self, conn, address, asdu, io, value, session_id): 284 try: 285 success = await asyncio.wait_for( 286 conn.send_command(asdu, io, value), timeout=command_timeout) 287 288 except ConnectionError: 289 mlog.warning('command %s %s %s to %s failed: connection closed', 290 asdu, io, value, address) 291 return 292 293 except asyncio.TimeoutError: 294 mlog.warning( 295 'command %s %s %s to %s timeout', asdu, io, value, address) 296 return 297 298 event = _create_event( 299 event_type=(*self._event_type_prefix, 'gateway', 'remote_device', 300 str(address), 'command', str(asdu), 301 str(io.function_type), str(io.information_number)), 302 payload={'success': success, 303 'session_id': session_id}) 304 305 await self._eventer_client.register([event]) 306 307 def _process_interrogation(self, event, address, asdu): 308 conn = self._conns.get(address) 309 if not conn or not conn.is_open: 310 mlog.warning("event %s ignored due to connection closed", event) 311 return 312 313 self._remote_groups[address].spawn( 314 self._interrogate_req_res, conn, address, asdu) 315 316 async def _interrogate_req_res(self, conn, address, asdu): 317 try: 318 await asyncio.wait_for(conn.interrogate(asdu), 319 timeout=interrogate_timeout) 320 321 except ConnectionError: 322 mlog.warning('interrogation on %s to %s failed: connection closed', 323 asdu, address) 324 return 325 326 except asyncio.TimeoutError: 327 mlog.warning('interrogation on %s to %s timeout', asdu, address) 328 return 329 330 event = _create_event( 331 event_type=(*self._event_type_prefix, 'gateway', 'remote_device', 332 str(address), 'interrogation', str(asdu)), 333 payload=None) 334 335 await self._eventer_client.register([event]) 336 337 async def _register_status(self, status): 338 event = _create_event( 339 event_type=(*self._event_type_prefix, 'gateway', 'status'), 340 payload=status) 341 342 await self._eventer_client.register([event]) 343 344 async def _register_rmt_status(self, address, status): 345 event = _create_event( 346 event_type=(*self._event_type_prefix, 347 'gateway', 'remote_device', str(address), 'status'), 348 payload=status) 349 350 await self._eventer_client.register([event]) 351 352 def _address_from_event(self, event): 353 return int(event.type[len(self._event_type_prefix) + 2])
Device interface
Iec103MasterDevice( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], eventer_client: hat.event.eventer.client.Client, event_type_prefix: tuple[str, str, str])
62 def __init__(self, 63 conf: common.DeviceConf, 64 eventer_client: hat.event.eventer.Client, 65 event_type_prefix: common.EventTypePrefix): 66 self._conf = conf 67 self._eventer_client = eventer_client 68 self._event_type_prefix = event_type_prefix 69 self._master = None 70 self._conns = {} 71 self._remote_enabled = {i['address']: False 72 for i in conf['remote_devices']} 73 self._remote_confs = {i['address']: i 74 for i in conf['remote_devices']} 75 self._remote_groups = {} 76 self._async_group = aio.Group() 77 78 self.async_group.spawn(self._create_link_master_loop)
async def
process_events(self, events: Collection[hat.event.common.common.Event]):
84 async def process_events(self, events: Collection[hat.event.common.Event]): 85 for event in events: 86 try: 87 await self._process_event(event) 88 89 except Exception as e: 90 mlog.warning('error processing event: %s', e, exc_info=e)
Process received events
This method can be coroutine or regular function.