~krystianch/es51922-serial

aa3bdf121772a72a61bddaf8f438a916f9813f1a — Krystian Chachuła 10 months ago
Initial commit
1 files changed, 180 insertions(+), 0 deletions(-)

A es51922-serial.py
A  => es51922-serial.py +180 -0
@@ 1,180 @@
import argparse
import enum
import sys
from decimal import Decimal

import serial


class Function(enum.Enum):
    VOLTAGE = 0x3b
    CURRENT_MICRO = 0x3d
    CURRENT_MILLI = 0x3f
    CURRENT_22 = 0x30
    CURRENT_MANUAL = 0x39
    RESISTANCE = 0x33
    CONTINUITY = 0x35
    DIODE = 0x31
    FREQUENCY = 0x32
    CAPACITANCE = 0x36
    TEMPERATURE = 0x34
    ADP = 0x3e


UNITS = {
    (Function.VOLTAGE, 0, 0): "V",
    (Function.VOLTAGE, 1, 0): "Hz",
    (Function.VOLTAGE, 1, 1): "%",
    (Function.CURRENT_MICRO, 0, 0): "A",
    (Function.CURRENT_MICRO, 1, 0): "Hz",
    (Function.CURRENT_MICRO, 1, 1): "%",
    (Function.CURRENT_MILLI, 0, 0): "A",
    (Function.CURRENT_MILLI, 1, 0): "Hz",
    (Function.CURRENT_MILLI, 1, 1): "%",
    (Function.CURRENT_22, 0, 0): "A",
    (Function.CURRENT_22, 1, 0): "Hz",
    (Function.CURRENT_22, 1, 1): "%",
    (Function.CURRENT_MANUAL, 0, 0): "A",
    (Function.CURRENT_MANUAL, 1, 0): "Hz",
    (Function.CURRENT_MANUAL, 1, 1): "%",
    (Function.RESISTANCE, 0, 0): "Ω",
    (Function.CONTINUITY, 0, 0): "Ω",
    (Function.DIODE, 0, 0): "V",
    (Function.FREQUENCY, 0, 0): "Hz",
    (Function.FREQUENCY, 0, 1): "%",
    (Function.CAPACITANCE, 0, 0): "F",
    (Function.TEMPERATURE, 0, 0): "°C",
    (Function.TEMPERATURE, 0, 1): "°C",
}


EXPONENTS = {
    (Function.VOLTAGE, 0, 0): [-4, -3, -2, -1, -5],
    (Function.CURRENT_MICRO, 0, 0): [-8, -7],
    (Function.CURRENT_MILLI, 0, 0): [-6, -5],
    (Function.CURRENT_22, 0, 0): [-3],
    (Function.CURRENT_MANUAL, 0, 0): [-4, -3, -2, -1, 0],
    (Function.RESISTANCE, 0, 0): [-2, -1, 0, 1, 2, 3, 4],
    (Function.CONTINUITY, 0, 0): [-2],
    (Function.DIODE, 0, 0): [-4],
    (Function.FREQUENCY, 0, 0): [-2, -1, None, 0, 1, 2, 3, 4],
    (Function.FREQUENCY, 0, 1): [-1],
    (Function.CAPACITANCE, 0, 0): [-12, -11, -10, -9, -8, -7, -6, -5],
}


class Serial(serial.Serial):
    def __init__(self, port=None, timeout=None, inter_byte_timeout=None, exclusive=None):
        super().__init__(
            baudrate=19200,
            bytesize=serial.SEVENBITS,
            parity=serial.PARITY_ODD,
            stopbits=serial.STOPBITS_ONE,
            timeout=timeout,
            xonxoff=False,
            rtscts=False,
            dsrdtr=False,
            inter_byte_timeout=inter_byte_timeout,
            exclusive=exclusive,
        )
        self.port = port
        self.dtr = True
        self.rts = False
        if port is not None:
            self.open()

    def write(self, _):
        raise NotImplementedError()


def split_bits(b):
    for _ in range(8):
        yield (b & 0x80) >> 7
        b <<= 1


def parse(packet):
    if len(packet) != 14:
        raise ValueError("Invalid packet length: %d, expecting 14" % len(packet))

    range_, *digits_, function, status, option1, option2, option3, option4, _, _ \
        = packet

    *hnib, judge, sign, battery_low, overflow = split_bits(status)
    if hnib != [0, 0, 1, 1]:
        raise ValueError("Corrupted status: 0x%x" % status)

    *hnib, max_, min_, rel, rmr = split_bits(option1)
    if hnib != [0, 0, 1, 1]:
        raise ValueError("Corrupted option1: 0x%x" % option1)
    if rel:
        raise ValueError("Unsupported function: REL/Zero")
    if max_ or min_ or rmr:
        raise ValueError("Unsupported function: Max/Min")

    *hnib, underflow, p_max, p_min, zero = split_bits(option2)
    if hnib != [0, 0, 1, 1] or zero != 0:
        raise ValueError("Corrupted option2: 0x%x" % option2)
    if p_max or p_min:
        raise ValueError("Unsupported function: Pmin/Pmax")

    *hnib, dc, ac, auto, vahz = split_bits(option3)
    if hnib != [0, 0, 1, 1]:
        raise ValueError("Corrupted option3: 0x%x" % option3)

    *hnib, vbar, hold, lpf = split_bits(option4)
    if hnib != [0, 0, 1, 1, 0]:
        raise ValueError("Corrupted option3: 0x%x" % option3)
    if vbar:
        raise ValueError("Unsupported configuration: VBAR is high")

    if not bytes(digits_).isdigit():
        raise ValueError("Digits contain non-digits: %s" % digits_)
    digits = [x - ord("0") for x in digits_]

    if not bytes([range_]).isdigit():
        raise ValueError("Range is not a digit: 0x%x" % range_)
    range_idx = range_ - ord("0")
    try:
        mode_exponents = EXPONENTS[(Function(function), vahz, judge)]
    except KeyError:
        raise ValueError("Unsupported mode: function=0x%x, vahz=%d, judge=%d"
            % (function, vahz, judge))
    try:
        unit = UNITS[(Function(function), vahz, judge)]
    except KeyError:
        raise ValueError("Unsupported unit: function=0x%x, vahz=%d, judge=%d"
            % (function, vahz, judge))
    exponent = mode_exponents[range_idx]
    if exponent is None:
        raise ValueError("Unsupported range: range=0x%x, function=0x%x, vahz=%d, "
            "judge=%d" % (range_, function, vahz, judge))

    if underflow:
        return Decimal("-Inf"), unit
    if overflow:
        return Decimal("+Inf"), unit

    return Decimal((sign, digits, exponent)), unit


parser = argparse.ArgumentParser()
parser.add_argument("port")
parser.add_argument("--no-units", action="store_true")

if __name__ == "__main__":
    args = parser.parse_args()

    try:
        with Serial(args.port) as ser:
            for packet in ser:
                try:
                    value, unit = parse(packet)
                    if not args.no_units:
                        print(value, unit)
                    else:
                        print(value)
                except ValueError as e:
                    print(e, file=sys.stderr)
    except (KeyboardInterrupt, BrokenPipeError):
        pass