1d7d484e76af17353032d95fff1e1638b3badb04 — Noah Pederson 8 months ago 1f294b3
Rename fields, add tests, fix bugs [see ext.]

Renames kwargs for Map to be keytype and valuetype instead of
overloading 'type', allowing it to be used to denote a wrapped value
like every other type

Adds some roundtrip tests for some of the more complex fields

Fixes bugs
- Optional fields were always encoded
- Optional fields did not encode the >0 value to indicate the following
  value is present
- Fixes to_dict for Struct types

Also renames some fields in Struct's pack to not overload 'type'
2 files changed, 128 insertions(+), 39 deletions(-)

M bare/encoder.py
M bare/test_encoder.py
M bare/encoder.py => bare/encoder.py +55 -27
@@ 152,10 152,10 @@ class Field(ABC):
    def to_dict(self, value=None):
        if value is None:
            value = self._value
        if isinstance(self._value, Field):
            return self._value.value
        if isinstance(value, Field):
            return value.value
            return self._value
            return value

class Struct(ABC):

@@ 195,9 195,9 @@ class Struct(ABC):
    def _pack(self, fp: typing.BinaryIO, value=None):
        if value is None:
            value = self
        for field, type in value.fields().items():
            val = getattr(value, field)  # this gets the underlying value
            type._pack(fp, value=val)
        for name, field in value.fields().items():
            val = getattr(value, name)  # this gets the underlying value
            field._pack(fp, value=val)

    def _unpack(cls, fp: typing.BinaryIO):

@@ 246,8 246,8 @@ class Struct(ABC):
        if value is None:
            value = self
        output = {}
        for name, field in self.fields().items():
            val = getattr(self, name)
        for name, field in value.fields().items():
            val = getattr(value, name)
            output[name] = field.to_dict(value=val)
        return output

@@ 360,6 360,19 @@ class Array(Field):
        return self.__class__(type=self._type, length=self._length, values=values)

    def to_dict(self, value=None):
        if value is None:
            value = self._value
        output = []

        for item in value:
            if isinstance(item, (Struct, Field)):
        return output

class _ValidatedMap(UserDict):
    def __init__(self, *args, instance: "Map" = None, **kwargs):
        if instance is None:

@@ 388,37 401,37 @@ class Map(Field):
    _valuetype: typing.Type[Field] = None
    _default = None

    def __init__(self, key: Field = None, value: Field = None, values=None):
        if key is not None:
            if inspect.isclass(key):
                self._keytype = key()
    def __init__(self, keytype: Field = None, valuetype: Field = None, value=None):
        if keytype is not None:
            if inspect.isclass(keytype):
                self._keytype = keytype()
                self._keytype = key
                self._keytype = keytype
        elif self.__class__._keytype is None:
            raise TypeError(
                "Must either specify key as an argument to init or  _keytype class field"
                "Must either specify keytype as an argument to init or  _keytype class field"
            self._keytype = self.__class__._keytype()
        if value is not None:
            if inspect.isclass(value):
                self._valuetype = value()
        if valuetype is not None:
            if inspect.isclass(valuetype):
                self._valuetype = valuetype()
                self._valuetype = value
                self._valuetype = valuetype
        elif self.__class__._valuetype is None:
            raise TypeError(
                "Must either specify value as an argument to init or  _valuetype class field"
                "Must either specify valuetype as an argument to init or  _valuetype class field"
            self._valuetype = self.__class__._valuetype()
        if values:
            for k, v in values.items():
        if value:
            for k, v in value.items():
                valid, message = self._validatekv(k, v)
                if not valid:
                    raise ValidationError(
                        f"Unable to assign value to key: {k}: {message}"
        self._value = _ValidatedMap(values, instance=self)
        self._value = _ValidatedMap(value, instance=self)

    def __set__(self, instance, value):
        if instance is None:

@@ 450,7 463,7 @@ class Map(Field):
        return True, None

    def _pack(self, fp: typing.BinaryIO, value=None):
        if value is not None:
        if value is None:
            value = self._value  # type: _ValidatedMap
        count = len(value)
        _write_varint(fp, count, signed=False)

@@ 465,7 478,18 @@ class Map(Field):
            key = self._keytype._unpack(fp)
            value = self._valuetype.unpack(fp)
            values[key] = value
        return self.__class__(key=self._keytype, value=self._valuetype, values=values)
        return self.__class__(keytype=self._keytype, valuetype=self._valuetype, value=values)

    def to_dict(self, value=None):
        if value is None:
            value = self._value
        output = {}
        for k, v in value.items():
            if isinstance(v, (Field, Struct)):
                k = v.to_dict()
                output[k] = v
        return output

class Optional(Field):

@@ 505,14 529,15 @@ class Optional(Field):
            value = self._value
        if value is None:
            fp.write(struct.pack('<B', 0))
        self._wrapped._pack(fp, value=value)
            fp.write(struct.pack('<B', 1))
            self._wrapped._pack(fp, value=value)

    def _unpack(self, fp: typing.BinaryIO) -> 'Optional':
        buf = fp.read(1)
        check = struct.unpack('<B', buf)[0]
        if check == 0:
            return self.__class__(wrapped=self._wrapped, value=None)
        value = self._wrapped._unpack(fp)
        return self.__class__(wrapped=self._wrapped, value=value)

@@ 609,7 634,10 @@ def _read_varint(fp: typing.BinaryIO, signed=True) -> int:
    output = 0
    offset = 0
    while True:
        b = fp.read(1)[0]
            b = fp.read(1)[0]
        except IndexError as e:
            raise RuntimeError("Not enough bytes in buffer to decode")
        if b < 0x80:
            value = output | b << offset
            if signed:

M bare/test_encoder.py => bare/test_encoder.py +73 -12
@@ 80,7 80,7 @@ class ArrayTest(Struct):
    n = Array(Nested, length=1)

def test_array():
def test_array_struct():
    ex = ArrayTest()
    ex.a = [1, 2, 3]
    ex.n = [Nested(s="test")]

@@ 179,7 179,7 @@ class Customer(Struct):
    email = Str()
    address = Address()
    orders = Array(Order)
    metadata: Map(Str, Data)
    metadata = Map(Str, Data)

class Employee(Struct):
    name = Str()

@@ 199,18 199,79 @@ class Person(Union):
@pytest.mark.parametrize('file', ['customer.bin', 'employee.bin', 'people.bin', 'terminated.bin'])
def test_people(file):
    with open(os.path.join(os.path.dirname(__file__), '_examples', file), 'br') as f:
        p = Person().unpack(f)
        people = []
        while True:
                p = Person().unpack(f)
            except RuntimeError:
        f = f.read()
        buf = io.BytesIO()
        for person in people:
        assert buf.getvalue() == f

def test_stream():
    with open(os.path.join(os.path.dirname(__file__), '_examples', 'people.bin'), 'br') as f:
        p = Person().unpack(f)
        buf = io.BytesIO()
    with open('./test.bin', 'bw') as f:
def test_varint():
    expected = b'\x18'
    i = Int(value=12)
    assert i.pack() == expected

    i = Int(value=12345)
    expected = b'\xf2\xc0\x01'
    assert i.pack() == expected
    i = Int(value=-12345678)
    expected = b'\x9b\x85\xe3\x0b'
    assert i.pack() == expected

def test_uvarint():
    expected = b'\xce\xc2\xf1\x05'
    i = UInt(value=12345678)
    assert i.pack() == expected

def test_string():
    expected = b'\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67'
    s = Str(value="a test string")
    assert s.pack() == expected
    s = Str(value="")
    assert s.pack() == b'\x00'

@pytest.mark.parametrize('value', [
    (Str(value='a test string'),b'\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67'),
    (Int(value=12345678), b'\x9c\x85\xe3\x0b\x9c\x85\xe3\x0b\x9c\x85\xe3\x0b\x9c\x85\xe3\x0b')
def test_fixed_array(value):
    a = Array(type=value[0].__class__, length=4, values=[value[0]] * 4)
    assert a.pack() == value[1]

    (Str(value='a test string'),b'\x04\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67'),
def test_array(value):
    a = Array(type=value[0].__class__, values=[value[0]] * 4)
    packed = a.pack()
    assert a.pack() == value[1]
    buf = io.BytesIO(packed)
    unpacked = a.unpack(buf)
    assert unpacked.to_dict() == a.to_dict()

class B(Struct):
    c = Int()

class X(Struct):
    a = Str()
    b = B()
def test_struct():
    s = X(a='a test string', b=B(c=12345))
    expected = b'\x0d\x61\x20\x74\x65\x73\x74\x20\x73\x74\x72\x69\x6e\x67\xf2\xc0\x01'
    assert s.pack() == expected
    buf = io.BytesIO(expected)
    unpacked = s.unpack(buf)
    assert unpacked.to_dict() == s.to_dict()

def test_map():
    expected = b'\x02\x04\x74\x65\x73\x74\x04\x74\x65\x73\x74\x07\x61\x6e\x6f\x74\x68\x65\x72\x04\x63\x61\x73\x65'
    m = Map(Str, Str, value={'test': 'test', 'another': 'case'})
    assert m.pack() == expected