hat.gateway.devices.iec101.common
1from hat.gateway.common import * # NOQA 2 3import enum 4import math 5import typing 6 7from hat import json 8from hat.drivers import iec101 9import hat.event.common 10 11 12class DataType(enum.Enum): 13 SINGLE = 'single' 14 DOUBLE = 'double' 15 STEP_POSITION = 'step_position' 16 BITSTRING = 'bitstring' 17 NORMALIZED = 'normalized' 18 SCALED = 'scaled' 19 FLOATING = 'floating' 20 BINARY_COUNTER = 'binary_counter' 21 PROTECTION = 'protection' 22 PROTECTION_START = 'protection_start' 23 PROTECTION_COMMAND = 'protection_command' 24 STATUS = 'status' 25 26 27class CommandType(enum.Enum): 28 SINGLE = 'single' 29 DOUBLE = 'double' 30 REGULATING = 'regulating' 31 NORMALIZED = 'normalized' 32 SCALED = 'scaled' 33 FLOATING = 'floating' 34 BITSTRING = 'bitstring' 35 36 37class DataKey(typing.NamedTuple): 38 data_type: DataType 39 asdu_address: iec101.AsduAddress 40 io_address: iec101.IoAddress 41 42 43class CommandKey(typing.NamedTuple): 44 cmd_type: CommandType 45 asdu_address: iec101.AsduAddress 46 io_address: iec101.IoAddress 47 48 49def data_to_json(data: iec101.Data) -> json.Data: 50 if isinstance(data, (iec101.SingleData, 51 iec101.DoubleData, 52 iec101.StepPositionData, 53 iec101.BitstringData, 54 iec101.ScaledData, 55 iec101.FloatingData, 56 iec101.BinaryCounterData, 57 iec101.StatusData)): 58 return {'value': _value_to_json(data.value), 59 'quality': _quality_to_json(data.quality)} 60 61 if isinstance(data, iec101.NormalizedData): 62 return {'value': _value_to_json(data.value), 63 'quality': (_quality_to_json(data.quality) 64 if data.quality else None)} 65 66 if isinstance(data, iec101.ProtectionData): 67 return {'value': _value_to_json(data.value), 68 'quality': _quality_to_json(data.quality), 69 'elapsed_time': data.elapsed_time} 70 71 if isinstance(data, iec101.ProtectionStartData): 72 return {'value': _value_to_json(data.value), 73 'quality': _quality_to_json(data.quality), 74 'duration_time': data.duration_time} 75 76 if isinstance(data, iec101.ProtectionCommandData): 77 return {'value': _value_to_json(data.value), 78 'quality': _quality_to_json(data.quality), 79 'operating_time': data.operating_time} 80 81 raise ValueError('unsupported data') 82 83 84def data_from_json(data_type: DataType, 85 data: json.Data) -> iec101.Data: 86 if data_type == DataType.SINGLE: 87 return iec101.SingleData( 88 value=_value_from_json(data_type, data['value']), 89 quality=_indication_quality_from_json(data['quality'])) 90 91 if data_type == DataType.DOUBLE: 92 return iec101.DoubleData( 93 value=_value_from_json(data_type, data['value']), 94 quality=_indication_quality_from_json(data['quality'])) 95 96 if data_type == DataType.STEP_POSITION: 97 return iec101.StepPositionData( 98 value=_value_from_json(data_type, data['value']), 99 quality=_measurement_quality_from_json(data['quality'])) 100 101 if data_type == DataType.BITSTRING: 102 return iec101.BitstringData( 103 value=_value_from_json(data_type, data['value']), 104 quality=_measurement_quality_from_json(data['quality'])) 105 106 if data_type == DataType.NORMALIZED: 107 return iec101.NormalizedData( 108 value=_value_from_json(data_type, data['value']), 109 quality=(_measurement_quality_from_json(data['quality']) 110 if data['quality'] else None)) 111 112 if data_type == DataType.SCALED: 113 return iec101.ScaledData( 114 value=_value_from_json(data_type, data['value']), 115 quality=_measurement_quality_from_json(data['quality'])) 116 117 if data_type == DataType.FLOATING: 118 return iec101.FloatingData( 119 value=_value_from_json(data_type, data['value']), 120 quality=_measurement_quality_from_json(data['quality'])) 121 122 if data_type == DataType.BINARY_COUNTER: 123 return iec101.BinaryCounterData( 124 value=_value_from_json(data_type, data['value']), 125 quality=_counter_quality_from_json(data['quality'])) 126 127 if data_type == DataType.PROTECTION: 128 return iec101.ProtectionData( 129 value=_value_from_json(data_type, data['value']), 130 quality=_protection_quality_from_json(data['quality']), 131 elapsed_time=data['elapsed_time']) 132 133 if data_type == DataType.PROTECTION_START: 134 return iec101.ProtectionStartData( 135 value=_value_from_json(data_type, data['value']), 136 quality=_protection_quality_from_json(data['quality']), 137 duration_time=data['duration_time']) 138 139 if data_type == DataType.PROTECTION_COMMAND: 140 return iec101.ProtectionCommandData( 141 value=_value_from_json(data_type, data['value']), 142 quality=_protection_quality_from_json(data['quality']), 143 operating_time=data['operating_time']) 144 145 if data_type == DataType.STATUS: 146 return iec101.StatusData( 147 value=_value_from_json(data_type, data['value']), 148 quality=_measurement_quality_from_json(data['quality'])) 149 150 raise ValueError('unsupported data type') 151 152 153def command_to_json(cmd: iec101.Command) -> json.Data: 154 if isinstance(cmd, (iec101.SingleCommand, 155 iec101.DoubleCommand, 156 iec101.RegulatingCommand)): 157 return {'value': _value_to_json(cmd.value), 158 'select': cmd.select, 159 'qualifier': cmd.qualifier} 160 161 if isinstance(cmd, (iec101.NormalizedCommand, 162 iec101.ScaledCommand, 163 iec101.FloatingCommand)): 164 return {'value': _value_to_json(cmd.value), 165 'select': cmd.select} 166 167 if isinstance(cmd, iec101.BitstringCommand): 168 return {'value': _value_to_json(cmd.value)} 169 170 raise ValueError('unsupported command') 171 172 173def command_from_json(cmd_type: CommandType, 174 cmd: json.Data 175 ) -> iec101.Command: 176 if cmd_type == CommandType.SINGLE: 177 return iec101.SingleCommand( 178 value=_value_from_json(cmd_type, cmd['value']), 179 select=cmd['select'], 180 qualifier=cmd['qualifier']) 181 182 if cmd_type == CommandType.DOUBLE: 183 return iec101.DoubleCommand( 184 value=_value_from_json(cmd_type, cmd['value']), 185 select=cmd['select'], 186 qualifier=cmd['qualifier']) 187 188 if cmd_type == CommandType.REGULATING: 189 return iec101.RegulatingCommand( 190 value=_value_from_json(cmd_type, cmd['value']), 191 select=cmd['select'], 192 qualifier=cmd['qualifier']) 193 194 if cmd_type == CommandType.NORMALIZED: 195 return iec101.NormalizedCommand( 196 value=_value_from_json(cmd_type, cmd['value']), 197 select=cmd['select']) 198 199 if cmd_type == CommandType.SCALED: 200 return iec101.ScaledCommand( 201 value=_value_from_json(cmd_type, cmd['value']), 202 select=cmd['select']) 203 204 if cmd_type == CommandType.FLOATING: 205 return iec101.FloatingCommand( 206 value=_value_from_json(cmd_type, cmd['value']), 207 select=cmd['select']) 208 209 if cmd_type == CommandType.BITSTRING: 210 return iec101.BitstringCommand( 211 value=_value_from_json(cmd_type, cmd['value'])) 212 213 raise ValueError('unsupported command type') 214 215 216def time_to_source_timestamp(t: iec101.Time | None 217 ) -> hat.event.common.Timestamp | None: 218 return ( 219 hat.event.common.timestamp_from_datetime(iec101.time_to_datetime(t)) 220 if t else None) 221 222 223def time_from_source_timestamp(t: hat.event.common.Timestamp | None, 224 ) -> iec101.Time | None: 225 return ( 226 iec101.time_from_datetime(hat.event.common.timestamp_to_datetime(t)) 227 if t else None) 228 229 230def get_data_type(data: iec101.Data) -> DataType: 231 if isinstance(data, iec101.SingleData): 232 return DataType.SINGLE 233 234 if isinstance(data, iec101.DoubleData): 235 return DataType.DOUBLE 236 237 if isinstance(data, iec101.StepPositionData): 238 return DataType.STEP_POSITION 239 240 if isinstance(data, iec101.BitstringData): 241 return DataType.BITSTRING 242 243 if isinstance(data, iec101.NormalizedData): 244 return DataType.NORMALIZED 245 246 if isinstance(data, iec101.ScaledData): 247 return DataType.SCALED 248 249 if isinstance(data, iec101.FloatingData): 250 return DataType.FLOATING 251 252 if isinstance(data, iec101.BinaryCounterData): 253 return DataType.BINARY_COUNTER 254 255 if isinstance(data, iec101.ProtectionData): 256 return DataType.PROTECTION 257 258 if isinstance(data, iec101.ProtectionStartData): 259 return DataType.PROTECTION_START 260 261 if isinstance(data, iec101.ProtectionCommandData): 262 return DataType.PROTECTION_COMMAND 263 264 if isinstance(data, iec101.StatusData): 265 return DataType.STATUS 266 267 raise ValueError('unsupported data') 268 269 270def get_command_type(cmd: iec101.Command) -> CommandType: 271 if isinstance(cmd, iec101.SingleCommand): 272 return CommandType.SINGLE 273 274 if isinstance(cmd, iec101.DoubleCommand): 275 return CommandType.DOUBLE 276 277 if isinstance(cmd, iec101.RegulatingCommand): 278 return CommandType.REGULATING 279 280 if isinstance(cmd, iec101.NormalizedCommand): 281 return CommandType.NORMALIZED 282 283 if isinstance(cmd, iec101.ScaledCommand): 284 return CommandType.SCALED 285 286 if isinstance(cmd, iec101.FloatingCommand): 287 return CommandType.FLOATING 288 289 if isinstance(cmd, iec101.BitstringCommand): 290 return CommandType.BITSTRING 291 292 raise ValueError('unsupported command') 293 294 295def cause_to_json(cls: typing.Type[enum.Enum], 296 cause: enum.Enum | int 297 ) -> json.Data: 298 return (cause.name if isinstance(cause, cls) else 299 cause.value if isinstance(cause, enum.Enum) else 300 cause) 301 302 303def cause_from_json(cls: typing.Type[enum.Enum], 304 cause: json.Data 305 ) -> enum.Enum | int: 306 return cls[cause] if isinstance(cause, str) else cause 307 308 309def _value_to_json(value): 310 if isinstance(value, (iec101.SingleValue, 311 iec101.DoubleValue, 312 iec101.RegulatingValue, 313 iec101.ProtectionValue)): 314 return value.name 315 316 if isinstance(value, iec101.StepPositionValue): 317 return {'value': value.value, 318 'transient': value.transient} 319 320 if isinstance(value, iec101.BitstringValue): 321 return list(value.value) 322 323 if isinstance(value, (iec101.NormalizedValue, 324 iec101.ScaledValue, 325 iec101.BinaryCounterValue)): 326 return value.value 327 328 if isinstance(value, iec101.FloatingValue): 329 return value.value if math.isfinite(value.value) else str(value.value) 330 331 if isinstance(value, iec101.ProtectionStartValue): 332 return {'general': value.general, 333 'l1': value.l1, 334 'l2': value.l2, 335 'l3': value.l3, 336 'ie': value.ie, 337 'reverse': value.reverse} 338 339 if isinstance(value, iec101.ProtectionCommandValue): 340 return {'general': value.general, 341 'l1': value.l1, 342 'l2': value.l2, 343 'l3': value.l3} 344 345 if isinstance(value, iec101.StatusValue): 346 return {'value': value.value, 347 'change': value.change} 348 349 raise ValueError('unsupported value') 350 351 352def _value_from_json(data_cmd_type, value): 353 if data_cmd_type in (DataType.SINGLE, CommandType.SINGLE): 354 return iec101.SingleValue[value] 355 356 if data_cmd_type in (DataType.DOUBLE, CommandType.DOUBLE): 357 return iec101.DoubleValue[value] 358 359 if data_cmd_type == CommandType.REGULATING: 360 return iec101.RegulatingValue[value] 361 362 if data_cmd_type == DataType.STEP_POSITION: 363 return iec101.StepPositionValue(value=value['value'], 364 transient=value['transient']) 365 366 if data_cmd_type in (DataType.BITSTRING, CommandType.BITSTRING): 367 return iec101.BitstringValue(value=bytes(value)) 368 369 if data_cmd_type in (DataType.NORMALIZED, CommandType.NORMALIZED): 370 return iec101.NormalizedValue(value=value) 371 372 if data_cmd_type in (DataType.SCALED, CommandType.SCALED): 373 return iec101.ScaledValue(value=value) 374 375 if data_cmd_type in (DataType.FLOATING, CommandType.FLOATING): 376 return iec101.FloatingValue(value=float(value)) 377 378 if data_cmd_type == DataType.BINARY_COUNTER: 379 return iec101.BinaryCounterValue(value=value) 380 381 if data_cmd_type == DataType.PROTECTION: 382 return iec101.ProtectionValue[value] 383 384 if data_cmd_type == DataType.PROTECTION_START: 385 return iec101.ProtectionStartValue(general=value['general'], 386 l1=value['l1'], 387 l2=value['l2'], 388 l3=value['l3'], 389 ie=value['ie'], 390 reverse=value['reverse']) 391 392 if data_cmd_type == DataType.PROTECTION_COMMAND: 393 return iec101.ProtectionCommandValue(general=value['general'], 394 l1=value['l1'], 395 l2=value['l2'], 396 l3=value['l3']) 397 398 if data_cmd_type == DataType.STATUS: 399 return iec101.StatusValue(value=value['value'], 400 change=value['change']) 401 402 raise ValueError('unsupported data or command type') 403 404 405def _quality_to_json(quality): 406 if isinstance(quality, iec101.IndicationQuality): 407 return {'invalid': quality.invalid, 408 'not_topical': quality.not_topical, 409 'substituted': quality.substituted, 410 'blocked': quality.blocked} 411 412 if isinstance(quality, iec101.MeasurementQuality): 413 return {'invalid': quality.invalid, 414 'not_topical': quality.not_topical, 415 'substituted': quality.substituted, 416 'blocked': quality.blocked, 417 'overflow': quality.overflow} 418 419 if isinstance(quality, iec101.CounterQuality): 420 return {'invalid': quality.invalid, 421 'adjusted': quality.adjusted, 422 'overflow': quality.overflow, 423 'sequence': quality.sequence} 424 425 if isinstance(quality, iec101.ProtectionQuality): 426 return {'invalid': quality.invalid, 427 'not_topical': quality.not_topical, 428 'substituted': quality.substituted, 429 'blocked': quality.blocked, 430 'time_invalid': quality.time_invalid} 431 432 raise ValueError('unsupported quality') 433 434 435def _indication_quality_from_json(quality): 436 return iec101.IndicationQuality(invalid=quality['invalid'], 437 not_topical=quality['not_topical'], 438 substituted=quality['substituted'], 439 blocked=quality['blocked']) 440 441 442def _measurement_quality_from_json(quality): 443 return iec101.MeasurementQuality(invalid=quality['invalid'], 444 not_topical=quality['not_topical'], 445 substituted=quality['substituted'], 446 blocked=quality['blocked'], 447 overflow=quality['overflow']) 448 449 450def _counter_quality_from_json(quality): 451 return iec101.CounterQuality(invalid=quality['invalid'], 452 adjusted=quality['adjusted'], 453 overflow=quality['overflow'], 454 sequence=quality['sequence']) 455 456 457def _protection_quality_from_json(quality): 458 return iec101.ProtectionQuality(invalid=quality['invalid'], 459 not_topical=quality['not_topical'], 460 substituted=quality['substituted'], 461 blocked=quality['blocked'], 462 time_invalid=quality['time_invalid'])
class
DataType(enum.Enum):
13class DataType(enum.Enum): 14 SINGLE = 'single' 15 DOUBLE = 'double' 16 STEP_POSITION = 'step_position' 17 BITSTRING = 'bitstring' 18 NORMALIZED = 'normalized' 19 SCALED = 'scaled' 20 FLOATING = 'floating' 21 BINARY_COUNTER = 'binary_counter' 22 PROTECTION = 'protection' 23 PROTECTION_START = 'protection_start' 24 PROTECTION_COMMAND = 'protection_command' 25 STATUS = 'status'
SINGLE =
<DataType.SINGLE: 'single'>
DOUBLE =
<DataType.DOUBLE: 'double'>
STEP_POSITION =
<DataType.STEP_POSITION: 'step_position'>
BITSTRING =
<DataType.BITSTRING: 'bitstring'>
NORMALIZED =
<DataType.NORMALIZED: 'normalized'>
SCALED =
<DataType.SCALED: 'scaled'>
FLOATING =
<DataType.FLOATING: 'floating'>
BINARY_COUNTER =
<DataType.BINARY_COUNTER: 'binary_counter'>
PROTECTION =
<DataType.PROTECTION: 'protection'>
PROTECTION_START =
<DataType.PROTECTION_START: 'protection_start'>
PROTECTION_COMMAND =
<DataType.PROTECTION_COMMAND: 'protection_command'>
STATUS =
<DataType.STATUS: 'status'>
class
CommandType(enum.Enum):
28class CommandType(enum.Enum): 29 SINGLE = 'single' 30 DOUBLE = 'double' 31 REGULATING = 'regulating' 32 NORMALIZED = 'normalized' 33 SCALED = 'scaled' 34 FLOATING = 'floating' 35 BITSTRING = 'bitstring'
SINGLE =
<CommandType.SINGLE: 'single'>
DOUBLE =
<CommandType.DOUBLE: 'double'>
REGULATING =
<CommandType.REGULATING: 'regulating'>
NORMALIZED =
<CommandType.NORMALIZED: 'normalized'>
SCALED =
<CommandType.SCALED: 'scaled'>
FLOATING =
<CommandType.FLOATING: 'floating'>
BITSTRING =
<CommandType.BITSTRING: 'bitstring'>
class
DataKey(typing.NamedTuple):
38class DataKey(typing.NamedTuple): 39 data_type: DataType 40 asdu_address: iec101.AsduAddress 41 io_address: iec101.IoAddress
DataKey(data_type, asdu_address, io_address)
DataKey( data_type: DataType, asdu_address: int, io_address: int)
Create new instance of DataKey(data_type, asdu_address, io_address)
class
CommandKey(typing.NamedTuple):
44class CommandKey(typing.NamedTuple): 45 cmd_type: CommandType 46 asdu_address: iec101.AsduAddress 47 io_address: iec101.IoAddress
CommandKey(cmd_type, asdu_address, io_address)
CommandKey( cmd_type: CommandType, asdu_address: int, io_address: int)
Create new instance of CommandKey(cmd_type, asdu_address, io_address)
def
data_to_json( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
50def data_to_json(data: iec101.Data) -> json.Data: 51 if isinstance(data, (iec101.SingleData, 52 iec101.DoubleData, 53 iec101.StepPositionData, 54 iec101.BitstringData, 55 iec101.ScaledData, 56 iec101.FloatingData, 57 iec101.BinaryCounterData, 58 iec101.StatusData)): 59 return {'value': _value_to_json(data.value), 60 'quality': _quality_to_json(data.quality)} 61 62 if isinstance(data, iec101.NormalizedData): 63 return {'value': _value_to_json(data.value), 64 'quality': (_quality_to_json(data.quality) 65 if data.quality else None)} 66 67 if isinstance(data, iec101.ProtectionData): 68 return {'value': _value_to_json(data.value), 69 'quality': _quality_to_json(data.quality), 70 'elapsed_time': data.elapsed_time} 71 72 if isinstance(data, iec101.ProtectionStartData): 73 return {'value': _value_to_json(data.value), 74 'quality': _quality_to_json(data.quality), 75 'duration_time': data.duration_time} 76 77 if isinstance(data, iec101.ProtectionCommandData): 78 return {'value': _value_to_json(data.value), 79 'quality': _quality_to_json(data.quality), 80 'operating_time': data.operating_time} 81 82 raise ValueError('unsupported data')
def
data_from_json( data_type: DataType, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData:
85def data_from_json(data_type: DataType, 86 data: json.Data) -> iec101.Data: 87 if data_type == DataType.SINGLE: 88 return iec101.SingleData( 89 value=_value_from_json(data_type, data['value']), 90 quality=_indication_quality_from_json(data['quality'])) 91 92 if data_type == DataType.DOUBLE: 93 return iec101.DoubleData( 94 value=_value_from_json(data_type, data['value']), 95 quality=_indication_quality_from_json(data['quality'])) 96 97 if data_type == DataType.STEP_POSITION: 98 return iec101.StepPositionData( 99 value=_value_from_json(data_type, data['value']), 100 quality=_measurement_quality_from_json(data['quality'])) 101 102 if data_type == DataType.BITSTRING: 103 return iec101.BitstringData( 104 value=_value_from_json(data_type, data['value']), 105 quality=_measurement_quality_from_json(data['quality'])) 106 107 if data_type == DataType.NORMALIZED: 108 return iec101.NormalizedData( 109 value=_value_from_json(data_type, data['value']), 110 quality=(_measurement_quality_from_json(data['quality']) 111 if data['quality'] else None)) 112 113 if data_type == DataType.SCALED: 114 return iec101.ScaledData( 115 value=_value_from_json(data_type, data['value']), 116 quality=_measurement_quality_from_json(data['quality'])) 117 118 if data_type == DataType.FLOATING: 119 return iec101.FloatingData( 120 value=_value_from_json(data_type, data['value']), 121 quality=_measurement_quality_from_json(data['quality'])) 122 123 if data_type == DataType.BINARY_COUNTER: 124 return iec101.BinaryCounterData( 125 value=_value_from_json(data_type, data['value']), 126 quality=_counter_quality_from_json(data['quality'])) 127 128 if data_type == DataType.PROTECTION: 129 return iec101.ProtectionData( 130 value=_value_from_json(data_type, data['value']), 131 quality=_protection_quality_from_json(data['quality']), 132 elapsed_time=data['elapsed_time']) 133 134 if data_type == DataType.PROTECTION_START: 135 return iec101.ProtectionStartData( 136 value=_value_from_json(data_type, data['value']), 137 quality=_protection_quality_from_json(data['quality']), 138 duration_time=data['duration_time']) 139 140 if data_type == DataType.PROTECTION_COMMAND: 141 return iec101.ProtectionCommandData( 142 value=_value_from_json(data_type, data['value']), 143 quality=_protection_quality_from_json(data['quality']), 144 operating_time=data['operating_time']) 145 146 if data_type == DataType.STATUS: 147 return iec101.StatusData( 148 value=_value_from_json(data_type, data['value']), 149 quality=_measurement_quality_from_json(data['quality'])) 150 151 raise ValueError('unsupported data type')
def
command_to_json( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
154def command_to_json(cmd: iec101.Command) -> json.Data: 155 if isinstance(cmd, (iec101.SingleCommand, 156 iec101.DoubleCommand, 157 iec101.RegulatingCommand)): 158 return {'value': _value_to_json(cmd.value), 159 'select': cmd.select, 160 'qualifier': cmd.qualifier} 161 162 if isinstance(cmd, (iec101.NormalizedCommand, 163 iec101.ScaledCommand, 164 iec101.FloatingCommand)): 165 return {'value': _value_to_json(cmd.value), 166 'select': cmd.select} 167 168 if isinstance(cmd, iec101.BitstringCommand): 169 return {'value': _value_to_json(cmd.value)} 170 171 raise ValueError('unsupported command')
def
command_from_json( cmd_type: CommandType, cmd: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand:
174def command_from_json(cmd_type: CommandType, 175 cmd: json.Data 176 ) -> iec101.Command: 177 if cmd_type == CommandType.SINGLE: 178 return iec101.SingleCommand( 179 value=_value_from_json(cmd_type, cmd['value']), 180 select=cmd['select'], 181 qualifier=cmd['qualifier']) 182 183 if cmd_type == CommandType.DOUBLE: 184 return iec101.DoubleCommand( 185 value=_value_from_json(cmd_type, cmd['value']), 186 select=cmd['select'], 187 qualifier=cmd['qualifier']) 188 189 if cmd_type == CommandType.REGULATING: 190 return iec101.RegulatingCommand( 191 value=_value_from_json(cmd_type, cmd['value']), 192 select=cmd['select'], 193 qualifier=cmd['qualifier']) 194 195 if cmd_type == CommandType.NORMALIZED: 196 return iec101.NormalizedCommand( 197 value=_value_from_json(cmd_type, cmd['value']), 198 select=cmd['select']) 199 200 if cmd_type == CommandType.SCALED: 201 return iec101.ScaledCommand( 202 value=_value_from_json(cmd_type, cmd['value']), 203 select=cmd['select']) 204 205 if cmd_type == CommandType.FLOATING: 206 return iec101.FloatingCommand( 207 value=_value_from_json(cmd_type, cmd['value']), 208 select=cmd['select']) 209 210 if cmd_type == CommandType.BITSTRING: 211 return iec101.BitstringCommand( 212 value=_value_from_json(cmd_type, cmd['value'])) 213 214 raise ValueError('unsupported command type')
def
time_to_source_timestamp( t: hat.drivers.iec60870.encodings.common.Time | None) -> hat.event.common.common.Timestamp | None:
def
time_from_source_timestamp( t: hat.event.common.common.Timestamp | None) -> hat.drivers.iec60870.encodings.common.Time | None:
def
get_data_type( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> DataType:
231def get_data_type(data: iec101.Data) -> DataType: 232 if isinstance(data, iec101.SingleData): 233 return DataType.SINGLE 234 235 if isinstance(data, iec101.DoubleData): 236 return DataType.DOUBLE 237 238 if isinstance(data, iec101.StepPositionData): 239 return DataType.STEP_POSITION 240 241 if isinstance(data, iec101.BitstringData): 242 return DataType.BITSTRING 243 244 if isinstance(data, iec101.NormalizedData): 245 return DataType.NORMALIZED 246 247 if isinstance(data, iec101.ScaledData): 248 return DataType.SCALED 249 250 if isinstance(data, iec101.FloatingData): 251 return DataType.FLOATING 252 253 if isinstance(data, iec101.BinaryCounterData): 254 return DataType.BINARY_COUNTER 255 256 if isinstance(data, iec101.ProtectionData): 257 return DataType.PROTECTION 258 259 if isinstance(data, iec101.ProtectionStartData): 260 return DataType.PROTECTION_START 261 262 if isinstance(data, iec101.ProtectionCommandData): 263 return DataType.PROTECTION_COMMAND 264 265 if isinstance(data, iec101.StatusData): 266 return DataType.STATUS 267 268 raise ValueError('unsupported data')
def
get_command_type( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> CommandType:
271def get_command_type(cmd: iec101.Command) -> CommandType: 272 if isinstance(cmd, iec101.SingleCommand): 273 return CommandType.SINGLE 274 275 if isinstance(cmd, iec101.DoubleCommand): 276 return CommandType.DOUBLE 277 278 if isinstance(cmd, iec101.RegulatingCommand): 279 return CommandType.REGULATING 280 281 if isinstance(cmd, iec101.NormalizedCommand): 282 return CommandType.NORMALIZED 283 284 if isinstance(cmd, iec101.ScaledCommand): 285 return CommandType.SCALED 286 287 if isinstance(cmd, iec101.FloatingCommand): 288 return CommandType.FLOATING 289 290 if isinstance(cmd, iec101.BitstringCommand): 291 return CommandType.BITSTRING 292 293 raise ValueError('unsupported command')
def
cause_to_json( cls: Type[enum.Enum], cause: enum.Enum | int) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
cause_from_json( cls: Type[enum.Enum], cause: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> enum.Enum | int: