hat.gateway.devices.iec101.common

  1from hat.gateway.common import *  # NOQA
  2
  3import enum
  4import typing
  5
  6from hat import json
  7from hat.drivers import iec101
  8import hat.event.common
  9
 10
 11class DataType(enum.Enum):
 12    SINGLE = 'single'
 13    DOUBLE = 'double'
 14    STEP_POSITION = 'step_position'
 15    BITSTRING = 'bitstring'
 16    NORMALIZED = 'normalized'
 17    SCALED = 'scaled'
 18    FLOATING = 'floating'
 19    BINARY_COUNTER = 'binary_counter'
 20    PROTECTION = 'protection'
 21    PROTECTION_START = 'protection_start'
 22    PROTECTION_COMMAND = 'protection_command'
 23    STATUS = 'status'
 24
 25
 26class CommandType(enum.Enum):
 27    SINGLE = 'single'
 28    DOUBLE = 'double'
 29    REGULATING = 'regulating'
 30    NORMALIZED = 'normalized'
 31    SCALED = 'scaled'
 32    FLOATING = 'floating'
 33    BITSTRING = 'bitstring'
 34
 35
 36class DataKey(typing.NamedTuple):
 37    data_type: DataType
 38    asdu_address: iec101.AsduAddress
 39    io_address: iec101.IoAddress
 40
 41
 42class CommandKey(typing.NamedTuple):
 43    cmd_type: CommandType
 44    asdu_address: iec101.AsduAddress
 45    io_address: iec101.IoAddress
 46
 47
 48def data_to_json(data: iec101.Data) -> json.Data:
 49    if isinstance(data, (iec101.SingleData,
 50                         iec101.DoubleData,
 51                         iec101.StepPositionData,
 52                         iec101.BitstringData,
 53                         iec101.ScaledData,
 54                         iec101.FloatingData,
 55                         iec101.BinaryCounterData,
 56                         iec101.StatusData)):
 57        return {'value': _value_to_json(data.value),
 58                'quality': _quality_to_json(data.quality)}
 59
 60    if isinstance(data, iec101.NormalizedData):
 61        return {'value': _value_to_json(data.value),
 62                'quality': (_quality_to_json(data.quality)
 63                            if data.quality else None)}
 64
 65    if isinstance(data, iec101.ProtectionData):
 66        return {'value': _value_to_json(data.value),
 67                'quality': _quality_to_json(data.quality),
 68                'elapsed_time': data.elapsed_time}
 69
 70    if isinstance(data, iec101.ProtectionStartData):
 71        return {'value': _value_to_json(data.value),
 72                'quality': _quality_to_json(data.quality),
 73                'duration_time': data.duration_time}
 74
 75    if isinstance(data, iec101.ProtectionCommandData):
 76        return {'value': _value_to_json(data.value),
 77                'quality': _quality_to_json(data.quality),
 78                'operating_time': data.operating_time}
 79
 80    raise ValueError('unsupported data')
 81
 82
 83def data_from_json(data_type: DataType,
 84                   data: json.Data) -> iec101.Data:
 85    if data_type == DataType.SINGLE:
 86        return iec101.SingleData(
 87            value=_value_from_json(data_type, data['value']),
 88            quality=_indication_quality_from_json(data['quality']))
 89
 90    if data_type == DataType.DOUBLE:
 91        return iec101.DoubleData(
 92            value=_value_from_json(data_type, data['value']),
 93            quality=_indication_quality_from_json(data['quality']))
 94
 95    if data_type == DataType.STEP_POSITION:
 96        return iec101.StepPositionData(
 97            value=_value_from_json(data_type, data['value']),
 98            quality=_measurement_quality_from_json(data['quality']))
 99
100    if data_type == DataType.BITSTRING:
101        return iec101.BitstringData(
102            value=_value_from_json(data_type, data['value']),
103            quality=_measurement_quality_from_json(data['quality']))
104
105    if data_type == DataType.NORMALIZED:
106        return iec101.NormalizedData(
107            value=_value_from_json(data_type, data['value']),
108            quality=(_measurement_quality_from_json(data['quality'])
109                     if data['quality'] else None))
110
111    if data_type == DataType.SCALED:
112        return iec101.ScaledData(
113            value=_value_from_json(data_type, data['value']),
114            quality=_measurement_quality_from_json(data['quality']))
115
116    if data_type == DataType.FLOATING:
117        return iec101.FloatingData(
118            value=_value_from_json(data_type, data['value']),
119            quality=_measurement_quality_from_json(data['quality']))
120
121    if data_type == DataType.BINARY_COUNTER:
122        return iec101.BinaryCounterData(
123            value=_value_from_json(data_type, data['value']),
124            quality=_counter_quality_from_json(data['quality']))
125
126    if data_type == DataType.PROTECTION:
127        return iec101.ProtectionData(
128            value=_value_from_json(data_type, data['value']),
129            quality=_protection_quality_from_json(data['quality']),
130            elapsed_time=data['elapsed_time'])
131
132    if data_type == DataType.PROTECTION_START:
133        return iec101.ProtectionStartData(
134            value=_value_from_json(data_type, data['value']),
135            quality=_protection_quality_from_json(data['quality']),
136            duration_time=data['duration_time'])
137
138    if data_type == DataType.PROTECTION_COMMAND:
139        return iec101.ProtectionCommandData(
140            value=_value_from_json(data_type, data['value']),
141            quality=_protection_quality_from_json(data['quality']),
142            operating_time=data['operating_time'])
143
144    if data_type == DataType.STATUS:
145        return iec101.StatusData(
146            value=_value_from_json(data_type, data['value']),
147            quality=_measurement_quality_from_json(data['quality']))
148
149    raise ValueError('unsupported data type')
150
151
152def command_to_json(cmd: iec101.Command) -> json.Data:
153    if isinstance(cmd, (iec101.SingleCommand,
154                        iec101.DoubleCommand,
155                        iec101.RegulatingCommand)):
156        return {'value': _value_to_json(cmd.value),
157                'select': cmd.select,
158                'qualifier': cmd.qualifier}
159
160    if isinstance(cmd, (iec101.NormalizedCommand,
161                        iec101.ScaledCommand,
162                        iec101.FloatingCommand)):
163        return {'value': _value_to_json(cmd.value),
164                'select': cmd.select}
165
166    if isinstance(cmd, iec101.BitstringCommand):
167        return {'value': _value_to_json(cmd.value)}
168
169    raise ValueError('unsupported command')
170
171
172def command_from_json(cmd_type: CommandType,
173                      cmd: json.Data
174                      ) -> iec101.Command:
175    if cmd_type == CommandType.SINGLE:
176        return iec101.SingleCommand(
177            value=_value_from_json(cmd_type, cmd['value']),
178            select=cmd['select'],
179            qualifier=cmd['qualifier'])
180
181    if cmd_type == CommandType.DOUBLE:
182        return iec101.DoubleCommand(
183            value=_value_from_json(cmd_type, cmd['value']),
184            select=cmd['select'],
185            qualifier=cmd['qualifier'])
186
187    if cmd_type == CommandType.REGULATING:
188        return iec101.RegulatingCommand(
189            value=_value_from_json(cmd_type, cmd['value']),
190            select=cmd['select'],
191            qualifier=cmd['qualifier'])
192
193    if cmd_type == CommandType.NORMALIZED:
194        return iec101.NormalizedCommand(
195            value=_value_from_json(cmd_type, cmd['value']),
196            select=cmd['select'])
197
198    if cmd_type == CommandType.SCALED:
199        return iec101.ScaledCommand(
200            value=_value_from_json(cmd_type, cmd['value']),
201            select=cmd['select'])
202
203    if cmd_type == CommandType.FLOATING:
204        return iec101.FloatingCommand(
205            value=_value_from_json(cmd_type, cmd['value']),
206            select=cmd['select'])
207
208    if cmd_type == CommandType.BITSTRING:
209        return iec101.BitstringCommand(
210            value=_value_from_json(cmd_type, cmd['value']))
211
212    raise ValueError('unsupported command type')
213
214
215def time_to_source_timestamp(t: iec101.Time | None
216                             ) -> hat.event.common.Timestamp | None:
217    return (
218        hat.event.common.timestamp_from_datetime(iec101.time_to_datetime(t))
219        if t else None)
220
221
222def time_from_source_timestamp(t: hat.event.common.Timestamp | None,
223                               ) -> iec101.Time | None:
224    return (
225        iec101.time_from_datetime(hat.event.common.timestamp_to_datetime(t))
226        if t else None)
227
228
229def get_data_type(data: iec101.Data) -> DataType:
230    if isinstance(data, iec101.SingleData):
231        return DataType.SINGLE
232
233    if isinstance(data, iec101.DoubleData):
234        return DataType.DOUBLE
235
236    if isinstance(data, iec101.StepPositionData):
237        return DataType.STEP_POSITION
238
239    if isinstance(data, iec101.BitstringData):
240        return DataType.BITSTRING
241
242    if isinstance(data, iec101.NormalizedData):
243        return DataType.NORMALIZED
244
245    if isinstance(data, iec101.ScaledData):
246        return DataType.SCALED
247
248    if isinstance(data, iec101.FloatingData):
249        return DataType.FLOATING
250
251    if isinstance(data, iec101.BinaryCounterData):
252        return DataType.BINARY_COUNTER
253
254    if isinstance(data, iec101.ProtectionData):
255        return DataType.PROTECTION
256
257    if isinstance(data, iec101.ProtectionStartData):
258        return DataType.PROTECTION_START
259
260    if isinstance(data, iec101.ProtectionCommandData):
261        return DataType.PROTECTION_COMMAND
262
263    if isinstance(data, iec101.StatusData):
264        return DataType.STATUS
265
266    raise ValueError('unsupported data')
267
268
269def get_command_type(cmd: iec101.Command) -> CommandType:
270    if isinstance(cmd, iec101.SingleCommand):
271        return CommandType.SINGLE
272
273    if isinstance(cmd, iec101.DoubleCommand):
274        return CommandType.DOUBLE
275
276    if isinstance(cmd, iec101.RegulatingCommand):
277        return CommandType.REGULATING
278
279    if isinstance(cmd, iec101.NormalizedCommand):
280        return CommandType.NORMALIZED
281
282    if isinstance(cmd, iec101.ScaledCommand):
283        return CommandType.SCALED
284
285    if isinstance(cmd, iec101.FloatingCommand):
286        return CommandType.FLOATING
287
288    if isinstance(cmd, iec101.BitstringCommand):
289        return CommandType.BITSTRING
290
291    raise ValueError('unsupported command')
292
293
294def cause_to_json(cls: typing.Type[enum.Enum],
295                  cause: enum.Enum | int
296                  ) -> json.Data:
297    return (cause.name if isinstance(cause, cls) else
298            cause.value if isinstance(cause, enum.Enum) else
299            cause)
300
301
302def cause_from_json(cls: typing.Type[enum.Enum],
303                    cause: json.Data
304                    ) -> enum.Enum | int:
305    return cls[cause] if isinstance(cause, str) else cause
306
307
308def _value_to_json(value):
309    if isinstance(value, (iec101.SingleValue,
310                          iec101.DoubleValue,
311                          iec101.RegulatingValue,
312                          iec101.ProtectionValue)):
313        return value.name
314
315    if isinstance(value, iec101.StepPositionValue):
316        return {'value': value.value,
317                'transient': value.transient}
318
319    if isinstance(value, iec101.BitstringValue):
320        return list(value.value)
321
322    if isinstance(value, (iec101.NormalizedValue,
323                          iec101.ScaledValue,
324                          iec101.FloatingValue,
325                          iec101.BinaryCounterValue)):
326        return value.value
327
328    if isinstance(value, iec101.ProtectionStartValue):
329        return {'general': value.general,
330                'l1': value.l1,
331                'l2': value.l2,
332                'l3': value.l3,
333                'ie': value.ie,
334                'reverse': value.reverse}
335
336    if isinstance(value, iec101.ProtectionCommandValue):
337        return {'general': value.general,
338                'l1': value.l1,
339                'l2': value.l2,
340                'l3': value.l3}
341
342    if isinstance(value, iec101.StatusValue):
343        return {'value': value.value,
344                'change': value.change}
345
346    raise ValueError('unsupported value')
347
348
349def _value_from_json(data_cmd_type, value):
350    if data_cmd_type in (DataType.SINGLE, CommandType.SINGLE):
351        return iec101.SingleValue[value]
352
353    if data_cmd_type in (DataType.DOUBLE, CommandType.DOUBLE):
354        return iec101.DoubleValue[value]
355
356    if data_cmd_type == CommandType.REGULATING:
357        return iec101.RegulatingValue[value]
358
359    if data_cmd_type == DataType.STEP_POSITION:
360        return iec101.StepPositionValue(value=value['value'],
361                                        transient=value['transient'])
362
363    if data_cmd_type in (DataType.BITSTRING, CommandType.BITSTRING):
364        return iec101.BitstringValue(value=bytes(value))
365
366    if data_cmd_type in (DataType.NORMALIZED, CommandType.NORMALIZED):
367        return iec101.NormalizedValue(value=value)
368
369    if data_cmd_type in (DataType.SCALED, CommandType.SCALED):
370        return iec101.ScaledValue(value=value)
371
372    if data_cmd_type in (DataType.FLOATING, CommandType.FLOATING):
373        return iec101.FloatingValue(value=value)
374
375    if data_cmd_type == DataType.BINARY_COUNTER:
376        return iec101.BinaryCounterValue(value=value)
377
378    if data_cmd_type == DataType.PROTECTION:
379        return iec101.ProtectionValue[value]
380
381    if data_cmd_type == DataType.PROTECTION_START:
382        return iec101.ProtectionStartValue(general=value['general'],
383                                           l1=value['l1'],
384                                           l2=value['l2'],
385                                           l3=value['l3'],
386                                           ie=value['ie'],
387                                           reverse=value['reverse'])
388
389    if data_cmd_type == DataType.PROTECTION_COMMAND:
390        return iec101.ProtectionCommandValue(general=value['general'],
391                                             l1=value['l1'],
392                                             l2=value['l2'],
393                                             l3=value['l3'])
394
395    if data_cmd_type == DataType.STATUS:
396        return iec101.StatusValue(value=value['value'],
397                                  change=value['change'])
398
399    raise ValueError('unsupported data or command type')
400
401
402def _quality_to_json(quality):
403    if isinstance(quality, iec101.IndicationQuality):
404        return {'invalid': quality.invalid,
405                'not_topical': quality.not_topical,
406                'substituted': quality.substituted,
407                'blocked': quality.blocked}
408
409    if isinstance(quality, iec101.MeasurementQuality):
410        return {'invalid': quality.invalid,
411                'not_topical': quality.not_topical,
412                'substituted': quality.substituted,
413                'blocked': quality.blocked,
414                'overflow': quality.overflow}
415
416    if isinstance(quality, iec101.CounterQuality):
417        return {'invalid': quality.invalid,
418                'adjusted': quality.adjusted,
419                'overflow': quality.overflow,
420                'sequence': quality.sequence}
421
422    if isinstance(quality, iec101.ProtectionQuality):
423        return {'invalid': quality.invalid,
424                'not_topical': quality.not_topical,
425                'substituted': quality.substituted,
426                'blocked': quality.blocked,
427                'time_invalid': quality.time_invalid}
428
429    raise ValueError('unsupported quality')
430
431
432def _indication_quality_from_json(quality):
433    return iec101.IndicationQuality(invalid=quality['invalid'],
434                                    not_topical=quality['not_topical'],
435                                    substituted=quality['substituted'],
436                                    blocked=quality['blocked'])
437
438
439def _measurement_quality_from_json(quality):
440    return iec101.MeasurementQuality(invalid=quality['invalid'],
441                                     not_topical=quality['not_topical'],
442                                     substituted=quality['substituted'],
443                                     blocked=quality['blocked'],
444                                     overflow=quality['overflow'])
445
446
447def _counter_quality_from_json(quality):
448    return iec101.CounterQuality(invalid=quality['invalid'],
449                                 adjusted=quality['adjusted'],
450                                 overflow=quality['overflow'],
451                                 sequence=quality['sequence'])
452
453
454def _protection_quality_from_json(quality):
455    return iec101.ProtectionQuality(invalid=quality['invalid'],
456                                    not_topical=quality['not_topical'],
457                                    substituted=quality['substituted'],
458                                    blocked=quality['blocked'],
459                                    time_invalid=quality['time_invalid'])
class DataType(enum.Enum):
12class DataType(enum.Enum):
13    SINGLE = 'single'
14    DOUBLE = 'double'
15    STEP_POSITION = 'step_position'
16    BITSTRING = 'bitstring'
17    NORMALIZED = 'normalized'
18    SCALED = 'scaled'
19    FLOATING = 'floating'
20    BINARY_COUNTER = 'binary_counter'
21    PROTECTION = 'protection'
22    PROTECTION_START = 'protection_start'
23    PROTECTION_COMMAND = 'protection_command'
24    STATUS = 'status'

An enumeration.

SINGLE = <DataType.SINGLE: 'single'>
DOUBLE = <DataType.DOUBLE: 'double'>
STEP_POSITION = <DataType.STEP_POSITION: 'step_position'>
BITSTRING = <DataType.BITSTRING: 'bitstring'>
NORMALIZED = <DataType.NORMALIZED: 'normalized'>
SCALED = <DataType.SCALED: 'scaled'>
FLOATING = <DataType.FLOATING: 'floating'>
BINARY_COUNTER = <DataType.BINARY_COUNTER: 'binary_counter'>
PROTECTION = <DataType.PROTECTION: 'protection'>
PROTECTION_START = <DataType.PROTECTION_START: 'protection_start'>
PROTECTION_COMMAND = <DataType.PROTECTION_COMMAND: 'protection_command'>
STATUS = <DataType.STATUS: 'status'>
class CommandType(enum.Enum):
27class CommandType(enum.Enum):
28    SINGLE = 'single'
29    DOUBLE = 'double'
30    REGULATING = 'regulating'
31    NORMALIZED = 'normalized'
32    SCALED = 'scaled'
33    FLOATING = 'floating'
34    BITSTRING = 'bitstring'

An enumeration.

SINGLE = <CommandType.SINGLE: 'single'>
DOUBLE = <CommandType.DOUBLE: 'double'>
REGULATING = <CommandType.REGULATING: 'regulating'>
NORMALIZED = <CommandType.NORMALIZED: 'normalized'>
SCALED = <CommandType.SCALED: 'scaled'>
FLOATING = <CommandType.FLOATING: 'floating'>
BITSTRING = <CommandType.BITSTRING: 'bitstring'>
class DataKey(typing.NamedTuple):
37class DataKey(typing.NamedTuple):
38    data_type: DataType
39    asdu_address: iec101.AsduAddress
40    io_address: iec101.IoAddress

DataKey(data_type, asdu_address, io_address)

DataKey( data_type: DataType, asdu_address: int, io_address: int)

Create new instance of DataKey(data_type, asdu_address, io_address)

data_type: DataType

Alias for field number 0

asdu_address: int

Alias for field number 1

io_address: int

Alias for field number 2

class CommandKey(typing.NamedTuple):
43class CommandKey(typing.NamedTuple):
44    cmd_type: CommandType
45    asdu_address: iec101.AsduAddress
46    io_address: iec101.IoAddress

CommandKey(cmd_type, asdu_address, io_address)

CommandKey( cmd_type: CommandType, asdu_address: int, io_address: int)

Create new instance of CommandKey(cmd_type, asdu_address, io_address)

cmd_type: CommandType

Alias for field number 0

asdu_address: int

Alias for field number 1

io_address: int

Alias for field number 2

def data_to_json( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
49def data_to_json(data: iec101.Data) -> json.Data:
50    if isinstance(data, (iec101.SingleData,
51                         iec101.DoubleData,
52                         iec101.StepPositionData,
53                         iec101.BitstringData,
54                         iec101.ScaledData,
55                         iec101.FloatingData,
56                         iec101.BinaryCounterData,
57                         iec101.StatusData)):
58        return {'value': _value_to_json(data.value),
59                'quality': _quality_to_json(data.quality)}
60
61    if isinstance(data, iec101.NormalizedData):
62        return {'value': _value_to_json(data.value),
63                'quality': (_quality_to_json(data.quality)
64                            if data.quality else None)}
65
66    if isinstance(data, iec101.ProtectionData):
67        return {'value': _value_to_json(data.value),
68                'quality': _quality_to_json(data.quality),
69                'elapsed_time': data.elapsed_time}
70
71    if isinstance(data, iec101.ProtectionStartData):
72        return {'value': _value_to_json(data.value),
73                'quality': _quality_to_json(data.quality),
74                'duration_time': data.duration_time}
75
76    if isinstance(data, iec101.ProtectionCommandData):
77        return {'value': _value_to_json(data.value),
78                'quality': _quality_to_json(data.quality),
79                'operating_time': data.operating_time}
80
81    raise ValueError('unsupported data')
def data_from_json( data_type: DataType, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData:
 84def data_from_json(data_type: DataType,
 85                   data: json.Data) -> iec101.Data:
 86    if data_type == DataType.SINGLE:
 87        return iec101.SingleData(
 88            value=_value_from_json(data_type, data['value']),
 89            quality=_indication_quality_from_json(data['quality']))
 90
 91    if data_type == DataType.DOUBLE:
 92        return iec101.DoubleData(
 93            value=_value_from_json(data_type, data['value']),
 94            quality=_indication_quality_from_json(data['quality']))
 95
 96    if data_type == DataType.STEP_POSITION:
 97        return iec101.StepPositionData(
 98            value=_value_from_json(data_type, data['value']),
 99            quality=_measurement_quality_from_json(data['quality']))
100
101    if data_type == DataType.BITSTRING:
102        return iec101.BitstringData(
103            value=_value_from_json(data_type, data['value']),
104            quality=_measurement_quality_from_json(data['quality']))
105
106    if data_type == DataType.NORMALIZED:
107        return iec101.NormalizedData(
108            value=_value_from_json(data_type, data['value']),
109            quality=(_measurement_quality_from_json(data['quality'])
110                     if data['quality'] else None))
111
112    if data_type == DataType.SCALED:
113        return iec101.ScaledData(
114            value=_value_from_json(data_type, data['value']),
115            quality=_measurement_quality_from_json(data['quality']))
116
117    if data_type == DataType.FLOATING:
118        return iec101.FloatingData(
119            value=_value_from_json(data_type, data['value']),
120            quality=_measurement_quality_from_json(data['quality']))
121
122    if data_type == DataType.BINARY_COUNTER:
123        return iec101.BinaryCounterData(
124            value=_value_from_json(data_type, data['value']),
125            quality=_counter_quality_from_json(data['quality']))
126
127    if data_type == DataType.PROTECTION:
128        return iec101.ProtectionData(
129            value=_value_from_json(data_type, data['value']),
130            quality=_protection_quality_from_json(data['quality']),
131            elapsed_time=data['elapsed_time'])
132
133    if data_type == DataType.PROTECTION_START:
134        return iec101.ProtectionStartData(
135            value=_value_from_json(data_type, data['value']),
136            quality=_protection_quality_from_json(data['quality']),
137            duration_time=data['duration_time'])
138
139    if data_type == DataType.PROTECTION_COMMAND:
140        return iec101.ProtectionCommandData(
141            value=_value_from_json(data_type, data['value']),
142            quality=_protection_quality_from_json(data['quality']),
143            operating_time=data['operating_time'])
144
145    if data_type == DataType.STATUS:
146        return iec101.StatusData(
147            value=_value_from_json(data_type, data['value']),
148            quality=_measurement_quality_from_json(data['quality']))
149
150    raise ValueError('unsupported data type')
def command_to_json( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
153def command_to_json(cmd: iec101.Command) -> json.Data:
154    if isinstance(cmd, (iec101.SingleCommand,
155                        iec101.DoubleCommand,
156                        iec101.RegulatingCommand)):
157        return {'value': _value_to_json(cmd.value),
158                'select': cmd.select,
159                'qualifier': cmd.qualifier}
160
161    if isinstance(cmd, (iec101.NormalizedCommand,
162                        iec101.ScaledCommand,
163                        iec101.FloatingCommand)):
164        return {'value': _value_to_json(cmd.value),
165                'select': cmd.select}
166
167    if isinstance(cmd, iec101.BitstringCommand):
168        return {'value': _value_to_json(cmd.value)}
169
170    raise ValueError('unsupported command')
def command_from_json( cmd_type: CommandType, cmd: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand:
173def command_from_json(cmd_type: CommandType,
174                      cmd: json.Data
175                      ) -> iec101.Command:
176    if cmd_type == CommandType.SINGLE:
177        return iec101.SingleCommand(
178            value=_value_from_json(cmd_type, cmd['value']),
179            select=cmd['select'],
180            qualifier=cmd['qualifier'])
181
182    if cmd_type == CommandType.DOUBLE:
183        return iec101.DoubleCommand(
184            value=_value_from_json(cmd_type, cmd['value']),
185            select=cmd['select'],
186            qualifier=cmd['qualifier'])
187
188    if cmd_type == CommandType.REGULATING:
189        return iec101.RegulatingCommand(
190            value=_value_from_json(cmd_type, cmd['value']),
191            select=cmd['select'],
192            qualifier=cmd['qualifier'])
193
194    if cmd_type == CommandType.NORMALIZED:
195        return iec101.NormalizedCommand(
196            value=_value_from_json(cmd_type, cmd['value']),
197            select=cmd['select'])
198
199    if cmd_type == CommandType.SCALED:
200        return iec101.ScaledCommand(
201            value=_value_from_json(cmd_type, cmd['value']),
202            select=cmd['select'])
203
204    if cmd_type == CommandType.FLOATING:
205        return iec101.FloatingCommand(
206            value=_value_from_json(cmd_type, cmd['value']),
207            select=cmd['select'])
208
209    if cmd_type == CommandType.BITSTRING:
210        return iec101.BitstringCommand(
211            value=_value_from_json(cmd_type, cmd['value']))
212
213    raise ValueError('unsupported command type')
def time_to_source_timestamp( t: hat.drivers.iec60870.encodings.common.Time | None) -> hat.event.common.common.Timestamp | None:
216def time_to_source_timestamp(t: iec101.Time | None
217                             ) -> hat.event.common.Timestamp | None:
218    return (
219        hat.event.common.timestamp_from_datetime(iec101.time_to_datetime(t))
220        if t else None)
def time_from_source_timestamp( t: hat.event.common.common.Timestamp | None) -> hat.drivers.iec60870.encodings.common.Time | None:
223def time_from_source_timestamp(t: hat.event.common.Timestamp | None,
224                               ) -> iec101.Time | None:
225    return (
226        iec101.time_from_datetime(hat.event.common.timestamp_to_datetime(t))
227        if t else None)
def get_data_type( data: hat.drivers.iec101.common.SingleData | hat.drivers.iec101.common.DoubleData | hat.drivers.iec101.common.StepPositionData | hat.drivers.iec101.common.BitstringData | hat.drivers.iec101.common.NormalizedData | hat.drivers.iec101.common.ScaledData | hat.drivers.iec101.common.FloatingData | hat.drivers.iec101.common.BinaryCounterData | hat.drivers.iec101.common.ProtectionData | hat.drivers.iec101.common.ProtectionStartData | hat.drivers.iec101.common.ProtectionCommandData | hat.drivers.iec101.common.StatusData) -> DataType:
230def get_data_type(data: iec101.Data) -> DataType:
231    if isinstance(data, iec101.SingleData):
232        return DataType.SINGLE
233
234    if isinstance(data, iec101.DoubleData):
235        return DataType.DOUBLE
236
237    if isinstance(data, iec101.StepPositionData):
238        return DataType.STEP_POSITION
239
240    if isinstance(data, iec101.BitstringData):
241        return DataType.BITSTRING
242
243    if isinstance(data, iec101.NormalizedData):
244        return DataType.NORMALIZED
245
246    if isinstance(data, iec101.ScaledData):
247        return DataType.SCALED
248
249    if isinstance(data, iec101.FloatingData):
250        return DataType.FLOATING
251
252    if isinstance(data, iec101.BinaryCounterData):
253        return DataType.BINARY_COUNTER
254
255    if isinstance(data, iec101.ProtectionData):
256        return DataType.PROTECTION
257
258    if isinstance(data, iec101.ProtectionStartData):
259        return DataType.PROTECTION_START
260
261    if isinstance(data, iec101.ProtectionCommandData):
262        return DataType.PROTECTION_COMMAND
263
264    if isinstance(data, iec101.StatusData):
265        return DataType.STATUS
266
267    raise ValueError('unsupported data')
def get_command_type( cmd: hat.drivers.iec101.common.SingleCommand | hat.drivers.iec101.common.DoubleCommand | hat.drivers.iec101.common.RegulatingCommand | hat.drivers.iec101.common.NormalizedCommand | hat.drivers.iec101.common.ScaledCommand | hat.drivers.iec101.common.FloatingCommand | hat.drivers.iec101.common.BitstringCommand) -> CommandType:
270def get_command_type(cmd: iec101.Command) -> CommandType:
271    if isinstance(cmd, iec101.SingleCommand):
272        return CommandType.SINGLE
273
274    if isinstance(cmd, iec101.DoubleCommand):
275        return CommandType.DOUBLE
276
277    if isinstance(cmd, iec101.RegulatingCommand):
278        return CommandType.REGULATING
279
280    if isinstance(cmd, iec101.NormalizedCommand):
281        return CommandType.NORMALIZED
282
283    if isinstance(cmd, iec101.ScaledCommand):
284        return CommandType.SCALED
285
286    if isinstance(cmd, iec101.FloatingCommand):
287        return CommandType.FLOATING
288
289    if isinstance(cmd, iec101.BitstringCommand):
290        return CommandType.BITSTRING
291
292    raise ValueError('unsupported command')
def cause_to_json( cls: Type[enum.Enum], cause: enum.Enum | int) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
295def cause_to_json(cls: typing.Type[enum.Enum],
296                  cause: enum.Enum | int
297                  ) -> json.Data:
298    return (cause.name if isinstance(cause, cls) else
299            cause.value if isinstance(cause, enum.Enum) else
300            cause)
def cause_from_json( cls: Type[enum.Enum], cause: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> enum.Enum | int:
303def cause_from_json(cls: typing.Type[enum.Enum],
304                    cause: json.Data
305                    ) -> enum.Enum | int:
306    return cls[cause] if isinstance(cause, str) else cause