hat.gateway.devices.iec101.common
1from hat.gateway.common import * # NOQA 2 3import enum 4import typing 5 6from hat import json 7from hat.drivers import iec101 8import hat.event.common 9 10 11class DataType(enum.Enum): 12 SINGLE = 'single' 13 DOUBLE = 'double' 14 STEP_POSITION = 'step_position' 15 BITSTRING = 'bitstring' 16 NORMALIZED = 'normalized' 17 SCALED = 'scaled' 18 FLOATING = 'floating' 19 BINARY_COUNTER = 'binary_counter' 20 PROTECTION = 'protection' 21 PROTECTION_START = 'protection_start' 22 PROTECTION_COMMAND = 'protection_command' 23 STATUS = 'status' 24 25 26class CommandType(enum.Enum): 27 SINGLE = 'single' 28 DOUBLE = 'double' 29 REGULATING = 'regulating' 30 NORMALIZED = 'normalized' 31 SCALED = 'scaled' 32 FLOATING = 'floating' 33 BITSTRING = 'bitstring' 34 35 36class DataKey(typing.NamedTuple): 37 data_type: DataType 38 asdu_address: iec101.AsduAddress 39 io_address: iec101.IoAddress 40 41 42class CommandKey(typing.NamedTuple): 43 cmd_type: CommandType 44 asdu_address: iec101.AsduAddress 45 io_address: iec101.IoAddress 46 47 48def data_to_json(data: iec101.Data) -> json.Data: 49 if isinstance(data, (iec101.SingleData, 50 iec101.DoubleData, 51 iec101.StepPositionData, 52 iec101.BitstringData, 53 iec101.ScaledData, 54 iec101.FloatingData, 55 iec101.BinaryCounterData, 56 iec101.StatusData)): 57 return {'value': _value_to_json(data.value), 58 'quality': _quality_to_json(data.quality)} 59 60 if isinstance(data, iec101.NormalizedData): 61 return {'value': _value_to_json(data.value), 62 'quality': (_quality_to_json(data.quality) 63 if data.quality else None)} 64 65 if isinstance(data, iec101.ProtectionData): 66 return {'value': _value_to_json(data.value), 67 'quality': _quality_to_json(data.quality), 68 'elapsed_time': data.elapsed_time} 69 70 if isinstance(data, iec101.ProtectionStartData): 71 return {'value': _value_to_json(data.value), 72 'quality': _quality_to_json(data.quality), 73 'duration_time': data.duration_time} 74 75 if isinstance(data, iec101.ProtectionCommandData): 76 return {'value': _value_to_json(data.value), 77 'quality': _quality_to_json(data.quality), 78 'operating_time': data.operating_time} 79 80 raise ValueError('unsupported data') 81 82 83def data_from_json(data_type: DataType, 84 data: json.Data) -> iec101.Data: 85 if data_type == DataType.SINGLE: 86 return iec101.SingleData( 87 value=_value_from_json(data_type, data['value']), 88 quality=_indication_quality_from_json(data['quality'])) 89 90 if data_type == DataType.DOUBLE: 91 return iec101.DoubleData( 92 value=_value_from_json(data_type, data['value']), 93 quality=_indication_quality_from_json(data['quality'])) 94 95 if data_type == DataType.STEP_POSITION: 96 return iec101.StepPositionData( 97 value=_value_from_json(data_type, data['value']), 98 quality=_measurement_quality_from_json(data['quality'])) 99 100 if data_type == DataType.BITSTRING: 101 return iec101.BitstringData( 102 value=_value_from_json(data_type, data['value']), 103 quality=_measurement_quality_from_json(data['quality'])) 104 105 if data_type == DataType.NORMALIZED: 106 return iec101.NormalizedData( 107 value=_value_from_json(data_type, data['value']), 108 quality=(_measurement_quality_from_json(data['quality']) 109 if data['quality'] else None)) 110 111 if data_type == DataType.SCALED: 112 return iec101.ScaledData( 113 value=_value_from_json(data_type, data['value']), 114 quality=_measurement_quality_from_json(data['quality'])) 115 116 if data_type == DataType.FLOATING: 117 return iec101.FloatingData( 118 value=_value_from_json(data_type, data['value']), 119 quality=_measurement_quality_from_json(data['quality'])) 120 121 if data_type == DataType.BINARY_COUNTER: 122 return iec101.BinaryCounterData( 123 value=_value_from_json(data_type, data['value']), 124 quality=_counter_quality_from_json(data['quality'])) 125 126 if data_type == DataType.PROTECTION: 127 return iec101.ProtectionData( 128 value=_value_from_json(data_type, data['value']), 129 quality=_protection_quality_from_json(data['quality']), 130 elapsed_time=data['elapsed_time']) 131 132 if data_type == DataType.PROTECTION_START: 133 return iec101.ProtectionStartData( 134 value=_value_from_json(data_type, data['value']), 135 quality=_protection_quality_from_json(data['quality']), 136 duration_time=data['duration_time']) 137 138 if data_type == DataType.PROTECTION_COMMAND: 139 return iec101.ProtectionCommandData( 140 value=_value_from_json(data_type, data['value']), 141 quality=_protection_quality_from_json(data['quality']), 142 operating_time=data['operating_time']) 143 144 if data_type == DataType.STATUS: 145 return iec101.StatusData( 146 value=_value_from_json(data_type, data['value']), 147 quality=_measurement_quality_from_json(data['quality'])) 148 149 raise ValueError('unsupported data type') 150 151 152def command_to_json(cmd: iec101.Command) -> json.Data: 153 if isinstance(cmd, (iec101.SingleCommand, 154 iec101.DoubleCommand, 155 iec101.RegulatingCommand)): 156 return {'value': _value_to_json(cmd.value), 157 'select': cmd.select, 158 'qualifier': cmd.qualifier} 159 160 if isinstance(cmd, (iec101.NormalizedCommand, 161 iec101.ScaledCommand, 162 iec101.FloatingCommand)): 163 return {'value': _value_to_json(cmd.value), 164 'select': cmd.select} 165 166 if isinstance(cmd, iec101.BitstringCommand): 167 return {'value': _value_to_json(cmd.value)} 168 169 raise ValueError('unsupported command') 170 171 172def command_from_json(cmd_type: CommandType, 173 cmd: json.Data 174 ) -> iec101.Command: 175 if cmd_type == CommandType.SINGLE: 176 return iec101.SingleCommand( 177 value=_value_from_json(cmd_type, cmd['value']), 178 select=cmd['select'], 179 qualifier=cmd['qualifier']) 180 181 if cmd_type == CommandType.DOUBLE: 182 return iec101.DoubleCommand( 183 value=_value_from_json(cmd_type, cmd['value']), 184 select=cmd['select'], 185 qualifier=cmd['qualifier']) 186 187 if cmd_type == CommandType.REGULATING: 188 return iec101.RegulatingCommand( 189 value=_value_from_json(cmd_type, cmd['value']), 190 select=cmd['select'], 191 qualifier=cmd['qualifier']) 192 193 if cmd_type == CommandType.NORMALIZED: 194 return iec101.NormalizedCommand( 195 value=_value_from_json(cmd_type, cmd['value']), 196 select=cmd['select']) 197 198 if cmd_type == CommandType.SCALED: 199 return iec101.ScaledCommand( 200 value=_value_from_json(cmd_type, cmd['value']), 201 select=cmd['select']) 202 203 if cmd_type == CommandType.FLOATING: 204 return iec101.FloatingCommand( 205 value=_value_from_json(cmd_type, cmd['value']), 206 select=cmd['select']) 207 208 if cmd_type == CommandType.BITSTRING: 209 return iec101.BitstringCommand( 210 value=_value_from_json(cmd_type, cmd['value'])) 211 212 raise ValueError('unsupported command type') 213 214 215def time_to_source_timestamp(t: iec101.Time | None 216 ) -> hat.event.common.Timestamp | None: 217 return ( 218 hat.event.common.timestamp_from_datetime(iec101.time_to_datetime(t)) 219 if t else None) 220 221 222def time_from_source_timestamp(t: hat.event.common.Timestamp | None, 223 ) -> iec101.Time | None: 224 return ( 225 iec101.time_from_datetime(hat.event.common.timestamp_to_datetime(t)) 226 if t else None) 227 228 229def get_data_type(data: iec101.Data) -> DataType: 230 if isinstance(data, iec101.SingleData): 231 return DataType.SINGLE 232 233 if isinstance(data, iec101.DoubleData): 234 return DataType.DOUBLE 235 236 if isinstance(data, iec101.StepPositionData): 237 return DataType.STEP_POSITION 238 239 if isinstance(data, iec101.BitstringData): 240 return DataType.BITSTRING 241 242 if isinstance(data, iec101.NormalizedData): 243 return DataType.NORMALIZED 244 245 if isinstance(data, iec101.ScaledData): 246 return DataType.SCALED 247 248 if isinstance(data, iec101.FloatingData): 249 return DataType.FLOATING 250 251 if isinstance(data, iec101.BinaryCounterData): 252 return DataType.BINARY_COUNTER 253 254 if isinstance(data, iec101.ProtectionData): 255 return DataType.PROTECTION 256 257 if isinstance(data, iec101.ProtectionStartData): 258 return DataType.PROTECTION_START 259 260 if isinstance(data, iec101.ProtectionCommandData): 261 return DataType.PROTECTION_COMMAND 262 263 if isinstance(data, iec101.StatusData): 264 return DataType.STATUS 265 266 raise ValueError('unsupported data') 267 268 269def get_command_type(cmd: iec101.Command) -> CommandType: 270 if isinstance(cmd, iec101.SingleCommand): 271 return CommandType.SINGLE 272 273 if isinstance(cmd, iec101.DoubleCommand): 274 return CommandType.DOUBLE 275 276 if isinstance(cmd, iec101.RegulatingCommand): 277 return CommandType.REGULATING 278 279 if isinstance(cmd, iec101.NormalizedCommand): 280 return CommandType.NORMALIZED 281 282 if isinstance(cmd, iec101.ScaledCommand): 283 return CommandType.SCALED 284 285 if isinstance(cmd, iec101.FloatingCommand): 286 return CommandType.FLOATING 287 288 if isinstance(cmd, iec101.BitstringCommand): 289 return CommandType.BITSTRING 290 291 raise ValueError('unsupported command') 292 293 294def cause_to_json(cls: typing.Type[enum.Enum], 295 cause: enum.Enum | int 296 ) -> json.Data: 297 return (cause.name if isinstance(cause, cls) else 298 cause.value if isinstance(cause, enum.Enum) else 299 cause) 300 301 302def cause_from_json(cls: typing.Type[enum.Enum], 303 cause: json.Data 304 ) -> enum.Enum | int: 305 return cls[cause] if isinstance(cause, str) else cause 306 307 308def _value_to_json(value): 309 if isinstance(value, (iec101.SingleValue, 310 iec101.DoubleValue, 311 iec101.RegulatingValue, 312 iec101.ProtectionValue)): 313 return value.name 314 315 if isinstance(value, iec101.StepPositionValue): 316 return {'value': value.value, 317 'transient': value.transient} 318 319 if isinstance(value, iec101.BitstringValue): 320 return list(value.value) 321 322 if isinstance(value, (iec101.NormalizedValue, 323 iec101.ScaledValue, 324 iec101.FloatingValue, 325 iec101.BinaryCounterValue)): 326 return value.value 327 328 if isinstance(value, iec101.ProtectionStartValue): 329 return {'general': value.general, 330 'l1': value.l1, 331 'l2': value.l2, 332 'l3': value.l3, 333 'ie': value.ie, 334 'reverse': value.reverse} 335 336 if isinstance(value, iec101.ProtectionCommandValue): 337 return {'general': value.general, 338 'l1': value.l1, 339 'l2': value.l2, 340 'l3': value.l3} 341 342 if isinstance(value, iec101.StatusValue): 343 return {'value': value.value, 344 'change': value.change} 345 346 raise ValueError('unsupported value') 347 348 349def _value_from_json(data_cmd_type, value): 350 if data_cmd_type in (DataType.SINGLE, CommandType.SINGLE): 351 return iec101.SingleValue[value] 352 353 if data_cmd_type in (DataType.DOUBLE, CommandType.DOUBLE): 354 return iec101.DoubleValue[value] 355 356 if data_cmd_type == CommandType.REGULATING: 357 return iec101.RegulatingValue[value] 358 359 if data_cmd_type == DataType.STEP_POSITION: 360 return iec101.StepPositionValue(value=value['value'], 361 transient=value['transient']) 362 363 if data_cmd_type in (DataType.BITSTRING, CommandType.BITSTRING): 364 return iec101.BitstringValue(value=bytes(value)) 365 366 if data_cmd_type in (DataType.NORMALIZED, CommandType.NORMALIZED): 367 return iec101.NormalizedValue(value=value) 368 369 if data_cmd_type in (DataType.SCALED, CommandType.SCALED): 370 return iec101.ScaledValue(value=value) 371 372 if data_cmd_type in (DataType.FLOATING, CommandType.FLOATING): 373 return iec101.FloatingValue(value=value) 374 375 if data_cmd_type == DataType.BINARY_COUNTER: 376 return iec101.BinaryCounterValue(value=value) 377 378 if data_cmd_type == DataType.PROTECTION: 379 return iec101.ProtectionValue[value] 380 381 if data_cmd_type == DataType.PROTECTION_START: 382 return iec101.ProtectionStartValue(general=value['general'], 383 l1=value['l1'], 384 l2=value['l2'], 385 l3=value['l3'], 386 ie=value['ie'], 387 reverse=value['reverse']) 388 389 if data_cmd_type == DataType.PROTECTION_COMMAND: 390 return iec101.ProtectionCommandValue(general=value['general'], 391 l1=value['l1'], 392 l2=value['l2'], 393 l3=value['l3']) 394 395 if data_cmd_type == DataType.STATUS: 396 return iec101.StatusValue(value=value['value'], 397 change=value['change']) 398 399 raise ValueError('unsupported data or command type') 400 401 402def _quality_to_json(quality): 403 if isinstance(quality, iec101.IndicationQuality): 404 return {'invalid': quality.invalid, 405 'not_topical': quality.not_topical, 406 'substituted': quality.substituted, 407 'blocked': quality.blocked} 408 409 if isinstance(quality, iec101.MeasurementQuality): 410 return {'invalid': quality.invalid, 411 'not_topical': quality.not_topical, 412 'substituted': quality.substituted, 413 'blocked': quality.blocked, 414 'overflow': quality.overflow} 415 416 if isinstance(quality, iec101.CounterQuality): 417 return {'invalid': quality.invalid, 418 'adjusted': quality.adjusted, 419 'overflow': quality.overflow, 420 'sequence': quality.sequence} 421 422 if isinstance(quality, iec101.ProtectionQuality): 423 return {'invalid': quality.invalid, 424 'not_topical': quality.not_topical, 425 'substituted': quality.substituted, 426 'blocked': quality.blocked, 427 'time_invalid': quality.time_invalid} 428 429 raise ValueError('unsupported quality') 430 431 432def _indication_quality_from_json(quality): 433 return iec101.IndicationQuality(invalid=quality['invalid'], 434 not_topical=quality['not_topical'], 435 substituted=quality['substituted'], 436 blocked=quality['blocked']) 437 438 439def _measurement_quality_from_json(quality): 440 return iec101.MeasurementQuality(invalid=quality['invalid'], 441 not_topical=quality['not_topical'], 442 substituted=quality['substituted'], 443 blocked=quality['blocked'], 444 overflow=quality['overflow']) 445 446 447def _counter_quality_from_json(quality): 448 return iec101.CounterQuality(invalid=quality['invalid'], 449 adjusted=quality['adjusted'], 450 overflow=quality['overflow'], 451 sequence=quality['sequence']) 452 453 454def _protection_quality_from_json(quality): 455 return iec101.ProtectionQuality(invalid=quality['invalid'], 456 not_topical=quality['not_topical'], 457 substituted=quality['substituted'], 458 blocked=quality['blocked'], 459 time_invalid=quality['time_invalid'])
class
DataType(enum.Enum):
12class DataType(enum.Enum): 13 SINGLE = 'single' 14 DOUBLE = 'double' 15 STEP_POSITION = 'step_position' 16 BITSTRING = 'bitstring' 17 NORMALIZED = 'normalized' 18 SCALED = 'scaled' 19 FLOATING = 'floating' 20 BINARY_COUNTER = 'binary_counter' 21 PROTECTION = 'protection' 22 PROTECTION_START = 'protection_start' 23 PROTECTION_COMMAND = 'protection_command' 24 STATUS = 'status'
An enumeration.
SINGLE =
<DataType.SINGLE: 'single'>
DOUBLE =
<DataType.DOUBLE: 'double'>
STEP_POSITION =
<DataType.STEP_POSITION: 'step_position'>
BITSTRING =
<DataType.BITSTRING: 'bitstring'>
NORMALIZED =
<DataType.NORMALIZED: 'normalized'>
SCALED =
<DataType.SCALED: 'scaled'>
FLOATING =
<DataType.FLOATING: 'floating'>
BINARY_COUNTER =
<DataType.BINARY_COUNTER: 'binary_counter'>
PROTECTION =
<DataType.PROTECTION: 'protection'>
PROTECTION_START =
<DataType.PROTECTION_START: 'protection_start'>
PROTECTION_COMMAND =
<DataType.PROTECTION_COMMAND: 'protection_command'>
STATUS =
<DataType.STATUS: 'status'>
class
CommandType(enum.Enum):
27class CommandType(enum.Enum): 28 SINGLE = 'single' 29 DOUBLE = 'double' 30 REGULATING = 'regulating' 31 NORMALIZED = 'normalized' 32 SCALED = 'scaled' 33 FLOATING = 'floating' 34 BITSTRING = 'bitstring'
An enumeration.
SINGLE =
<CommandType.SINGLE: 'single'>
DOUBLE =
<CommandType.DOUBLE: 'double'>
REGULATING =
<CommandType.REGULATING: 'regulating'>
NORMALIZED =
<CommandType.NORMALIZED: 'normalized'>
SCALED =
<CommandType.SCALED: 'scaled'>
FLOATING =
<CommandType.FLOATING: 'floating'>
BITSTRING =
<CommandType.BITSTRING: 'bitstring'>
class
DataKey(typing.NamedTuple):
37class DataKey(typing.NamedTuple): 38 data_type: DataType 39 asdu_address: iec101.AsduAddress 40 io_address: iec101.IoAddress
DataKey(data_type, asdu_address, io_address)
DataKey( data_type: DataType, asdu_address: int, io_address: int)
Create new instance of DataKey(data_type, asdu_address, io_address)
class
CommandKey(typing.NamedTuple):
43class CommandKey(typing.NamedTuple): 44 cmd_type: CommandType 45 asdu_address: iec101.AsduAddress 46 io_address: iec101.IoAddress
CommandKey(cmd_type, asdu_address, io_address)
CommandKey( cmd_type: CommandType, asdu_address: int, io_address: int)
Create new instance of CommandKey(cmd_type, asdu_address, io_address)
def
data_to_json( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
49def data_to_json(data: iec101.Data) -> json.Data: 50 if isinstance(data, (iec101.SingleData, 51 iec101.DoubleData, 52 iec101.StepPositionData, 53 iec101.BitstringData, 54 iec101.ScaledData, 55 iec101.FloatingData, 56 iec101.BinaryCounterData, 57 iec101.StatusData)): 58 return {'value': _value_to_json(data.value), 59 'quality': _quality_to_json(data.quality)} 60 61 if isinstance(data, iec101.NormalizedData): 62 return {'value': _value_to_json(data.value), 63 'quality': (_quality_to_json(data.quality) 64 if data.quality else None)} 65 66 if isinstance(data, iec101.ProtectionData): 67 return {'value': _value_to_json(data.value), 68 'quality': _quality_to_json(data.quality), 69 'elapsed_time': data.elapsed_time} 70 71 if isinstance(data, iec101.ProtectionStartData): 72 return {'value': _value_to_json(data.value), 73 'quality': _quality_to_json(data.quality), 74 'duration_time': data.duration_time} 75 76 if isinstance(data, iec101.ProtectionCommandData): 77 return {'value': _value_to_json(data.value), 78 'quality': _quality_to_json(data.quality), 79 'operating_time': data.operating_time} 80 81 raise ValueError('unsupported data')
def
data_from_json( data_type: DataType, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData:
84def data_from_json(data_type: DataType, 85 data: json.Data) -> iec101.Data: 86 if data_type == DataType.SINGLE: 87 return iec101.SingleData( 88 value=_value_from_json(data_type, data['value']), 89 quality=_indication_quality_from_json(data['quality'])) 90 91 if data_type == DataType.DOUBLE: 92 return iec101.DoubleData( 93 value=_value_from_json(data_type, data['value']), 94 quality=_indication_quality_from_json(data['quality'])) 95 96 if data_type == DataType.STEP_POSITION: 97 return iec101.StepPositionData( 98 value=_value_from_json(data_type, data['value']), 99 quality=_measurement_quality_from_json(data['quality'])) 100 101 if data_type == DataType.BITSTRING: 102 return iec101.BitstringData( 103 value=_value_from_json(data_type, data['value']), 104 quality=_measurement_quality_from_json(data['quality'])) 105 106 if data_type == DataType.NORMALIZED: 107 return iec101.NormalizedData( 108 value=_value_from_json(data_type, data['value']), 109 quality=(_measurement_quality_from_json(data['quality']) 110 if data['quality'] else None)) 111 112 if data_type == DataType.SCALED: 113 return iec101.ScaledData( 114 value=_value_from_json(data_type, data['value']), 115 quality=_measurement_quality_from_json(data['quality'])) 116 117 if data_type == DataType.FLOATING: 118 return iec101.FloatingData( 119 value=_value_from_json(data_type, data['value']), 120 quality=_measurement_quality_from_json(data['quality'])) 121 122 if data_type == DataType.BINARY_COUNTER: 123 return iec101.BinaryCounterData( 124 value=_value_from_json(data_type, data['value']), 125 quality=_counter_quality_from_json(data['quality'])) 126 127 if data_type == DataType.PROTECTION: 128 return iec101.ProtectionData( 129 value=_value_from_json(data_type, data['value']), 130 quality=_protection_quality_from_json(data['quality']), 131 elapsed_time=data['elapsed_time']) 132 133 if data_type == DataType.PROTECTION_START: 134 return iec101.ProtectionStartData( 135 value=_value_from_json(data_type, data['value']), 136 quality=_protection_quality_from_json(data['quality']), 137 duration_time=data['duration_time']) 138 139 if data_type == DataType.PROTECTION_COMMAND: 140 return iec101.ProtectionCommandData( 141 value=_value_from_json(data_type, data['value']), 142 quality=_protection_quality_from_json(data['quality']), 143 operating_time=data['operating_time']) 144 145 if data_type == DataType.STATUS: 146 return iec101.StatusData( 147 value=_value_from_json(data_type, data['value']), 148 quality=_measurement_quality_from_json(data['quality'])) 149 150 raise ValueError('unsupported data type')
def
command_to_json( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
153def command_to_json(cmd: iec101.Command) -> json.Data: 154 if isinstance(cmd, (iec101.SingleCommand, 155 iec101.DoubleCommand, 156 iec101.RegulatingCommand)): 157 return {'value': _value_to_json(cmd.value), 158 'select': cmd.select, 159 'qualifier': cmd.qualifier} 160 161 if isinstance(cmd, (iec101.NormalizedCommand, 162 iec101.ScaledCommand, 163 iec101.FloatingCommand)): 164 return {'value': _value_to_json(cmd.value), 165 'select': cmd.select} 166 167 if isinstance(cmd, iec101.BitstringCommand): 168 return {'value': _value_to_json(cmd.value)} 169 170 raise ValueError('unsupported command')
def
command_from_json( cmd_type: CommandType, cmd: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand:
173def command_from_json(cmd_type: CommandType, 174 cmd: json.Data 175 ) -> iec101.Command: 176 if cmd_type == CommandType.SINGLE: 177 return iec101.SingleCommand( 178 value=_value_from_json(cmd_type, cmd['value']), 179 select=cmd['select'], 180 qualifier=cmd['qualifier']) 181 182 if cmd_type == CommandType.DOUBLE: 183 return iec101.DoubleCommand( 184 value=_value_from_json(cmd_type, cmd['value']), 185 select=cmd['select'], 186 qualifier=cmd['qualifier']) 187 188 if cmd_type == CommandType.REGULATING: 189 return iec101.RegulatingCommand( 190 value=_value_from_json(cmd_type, cmd['value']), 191 select=cmd['select'], 192 qualifier=cmd['qualifier']) 193 194 if cmd_type == CommandType.NORMALIZED: 195 return iec101.NormalizedCommand( 196 value=_value_from_json(cmd_type, cmd['value']), 197 select=cmd['select']) 198 199 if cmd_type == CommandType.SCALED: 200 return iec101.ScaledCommand( 201 value=_value_from_json(cmd_type, cmd['value']), 202 select=cmd['select']) 203 204 if cmd_type == CommandType.FLOATING: 205 return iec101.FloatingCommand( 206 value=_value_from_json(cmd_type, cmd['value']), 207 select=cmd['select']) 208 209 if cmd_type == CommandType.BITSTRING: 210 return iec101.BitstringCommand( 211 value=_value_from_json(cmd_type, cmd['value'])) 212 213 raise ValueError('unsupported command type')
def
time_to_source_timestamp( t: hat.drivers.iec60870.encodings.common.Time | None) -> hat.event.common.common.Timestamp | None:
def
time_from_source_timestamp( t: hat.event.common.common.Timestamp | None) -> hat.drivers.iec60870.encodings.common.Time | None:
def
get_data_type( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> DataType:
230def get_data_type(data: iec101.Data) -> DataType: 231 if isinstance(data, iec101.SingleData): 232 return DataType.SINGLE 233 234 if isinstance(data, iec101.DoubleData): 235 return DataType.DOUBLE 236 237 if isinstance(data, iec101.StepPositionData): 238 return DataType.STEP_POSITION 239 240 if isinstance(data, iec101.BitstringData): 241 return DataType.BITSTRING 242 243 if isinstance(data, iec101.NormalizedData): 244 return DataType.NORMALIZED 245 246 if isinstance(data, iec101.ScaledData): 247 return DataType.SCALED 248 249 if isinstance(data, iec101.FloatingData): 250 return DataType.FLOATING 251 252 if isinstance(data, iec101.BinaryCounterData): 253 return DataType.BINARY_COUNTER 254 255 if isinstance(data, iec101.ProtectionData): 256 return DataType.PROTECTION 257 258 if isinstance(data, iec101.ProtectionStartData): 259 return DataType.PROTECTION_START 260 261 if isinstance(data, iec101.ProtectionCommandData): 262 return DataType.PROTECTION_COMMAND 263 264 if isinstance(data, iec101.StatusData): 265 return DataType.STATUS 266 267 raise ValueError('unsupported data')
def
get_command_type( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> CommandType:
270def get_command_type(cmd: iec101.Command) -> CommandType: 271 if isinstance(cmd, iec101.SingleCommand): 272 return CommandType.SINGLE 273 274 if isinstance(cmd, iec101.DoubleCommand): 275 return CommandType.DOUBLE 276 277 if isinstance(cmd, iec101.RegulatingCommand): 278 return CommandType.REGULATING 279 280 if isinstance(cmd, iec101.NormalizedCommand): 281 return CommandType.NORMALIZED 282 283 if isinstance(cmd, iec101.ScaledCommand): 284 return CommandType.SCALED 285 286 if isinstance(cmd, iec101.FloatingCommand): 287 return CommandType.FLOATING 288 289 if isinstance(cmd, iec101.BitstringCommand): 290 return CommandType.BITSTRING 291 292 raise ValueError('unsupported command')
def
cause_to_json( cls: Type[enum.Enum], cause: enum.Enum | int) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
cause_from_json( cls: Type[enum.Enum], cause: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> enum.Enum | int: