~chiefnoah/pybare

743a76f762a161b7b31ec1a01df72e050d9b9cc5 — Noah Pederson 8 months ago 592bbb8
Adds more tests, fixes some bugs
A bare/_examples/customer.bin => bare/_examples/customer.bin +0 -0
A bare/_examples/employee.bin => bare/_examples/employee.bin +0 -0
A bare/_examples/people.bin => bare/_examples/people.bin +0 -0
A bare/_examples/terminated.bin => bare/_examples/terminated.bin +1 -0
@@ 0,0 1,1 @@

\ No newline at end of file

M bare/encoder.py => bare/encoder.py +74 -35
@@ 96,7 96,7 @@ class Field(ABC):
        valid, message = self.validate(value)
        if not valid:
            raise ValidationError(
                f"value is invalid for BARE type {self._type}: {message}"
                f"value is invalid for BARE type {self._type.__class__.__name__}: {message}"
            )
        setattr(instance, f"_{self.name}", value)



@@ 109,6 109,10 @@ class Field(ABC):
        return self.__class__._default == None  # This is valid for BareType.Void

    @property
    def valid(self) -> typing.Tuple[bool, str]:
        return self.validate(self._value)

    @property
    def type(self):
        return self.__class__._type



@@ 126,9 130,8 @@ class Field(ABC):
    def _pack(self, fp, value=None):
        pass

    @classmethod
    @abstractmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "Field":
    def _unpack(self, fp: typing.BinaryIO) -> "Field":
        pass

    def pack(self, fp=None) -> typing.Optional[bytes]:


@@ 140,20 143,26 @@ class Field(ABC):
        if buffered:
            return fp

    @classmethod
    def unpack(cls, fp: typing.BinaryIO):
    def unpack(self, fp: typing.BinaryIO):
        # If it's a bytes-like, wrap it in a io buffer
        if hasattr(fp, "decode"):
            fp = io.BytesIO(fp)
        return cls._unpack(fp)
        return self._unpack(fp)

    def to_dict(self, value=None):
        if value is None:
            value = self._value
        if isinstance(self._value, Field):
            return self._value.value
        else:
            return self._value


class Struct(ABC):

    _type = BareType.Struct

    def __init__(self, *args, optional=False, **kwargs):
        self._optional = optional
    def __init__(self, *args, **kwargs):
        # loop through defined fields, if they have a corresponding kwarg entry, set the value
        for name, field in filter(
            lambda x: isinstance(x[1], (Field, Struct)), self.__class__.__dict__.items()


@@ 187,17 196,14 @@ class Struct(ABC):
        if value is None:
            value = self
        for field, type in value.fields().items():
            val = getattr(self, field)  # this gets the underlying value
            val = getattr(value, field)  # this gets the underlying value
            type._pack(fp, value=val)

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO):
        vals = {}
        for field, type in cls.fields().items():
            if isinstance(type, Array):
                val = type._unpack(fp, length=type._length)
            else:
                val = type._unpack(fp)
            val = type._unpack(fp)
            vals[field] = val.value
        return cls(**vals)



@@ 228,6 234,22 @@ class Struct(ABC):
            return False, f"{type(s)} is not a valid struct {type(self)}"
        return True, None

    @property
    def valid(self) -> typing.Tuple[bool, str]:
        for name, field in self.fields().items():
            valid, message = field.validate(getattr(self, name))
            if not valid:
                return False, message
        return True, None

    def to_dict(self, value=None) -> dict:
        if value is None:
            value = self
        output = {}
        for name, field in self.fields().items():
            val = getattr(self, name)
            output[name] = field.to_dict(value=val)
        return output

class _ValidatedList(UserList):
    def __init__(self, *args, instance: "Array" = None, **kwargs):


@@ 239,7 261,7 @@ class _ValidatedList(UserList):
        super().__init__(*args, **kwargs)

    def append(self, item):
        valid, message = self._instance.validate(len(self.data) + 1, item)
        valid, message = self._instance._validateitem(item, length=len(self.data) + 1)
        if not valid:
            raise ValidationError(message)
        self.data.append(item)


@@ 283,22 305,27 @@ class Array(Field):
            self._length = length
        if values:
            self.validate(values)
            self._value = values
            self._value = _ValidatedList(values, instance=self)
        else:
            self._value = []
            self._value = _ValidatedList(instance=self)

    def _validateitem(self, item) -> typing.Tuple[bool, str]:
    def _validateitem(self, item, length=0) -> typing.Tuple[bool, str]:
        if length > self._length:
            return False, f"length {length} larger than array max: {self._length}"
        if self._length > 0 and len(self._value) + 1 > self._length:
            return False, "outside of length bounds"
        return self._type.validate(item)

    def validate(self, items: typing.Collection) -> typing.Tuple[bool, str]:
        if self._length > 0 and len(items) > self._length:
            return False, f"lenth {len(items)} larger than array max: {cls._length}"
            return False, f"lenth {len(items)} larger than array max: {self._length}"
        if self._length > 0 and len(items) > self._length:
            return False, f"length {len(items)} greater than max length {self._length}"
        for item in items:
            valid, message = self._type.validate(item)
            if type(item) == type(self._type):
                valid, message = item.valid
            else:
                valid, message = self._type.validate(item)
            if not valid:
                return False, message
        return True, None


@@ 306,20 333,24 @@ class Array(Field):
    def _pack(self, fp: typing.BinaryIO, value=None):
        if value is None:
            value = self._value
        if self.__class__._length == 0:
        if self._length == 0:
            length = len(value)
            _write_varint(fp, length, signed=False)
        else:
            length = self.__class__.__length
            value = value.extend([self._type._default] * (length - len(value))) # pad with default values
            default = None
            if isinstance(self._type, Field):
                default = self._type._default
            elif isinstance(self._type, Struct):
                default = self._type.__class__()
            value.extend([default] * (self._length - len(value))) # pad with default values
        for item in value:
            self._type._pack(fp, item)
            self._type._pack(fp, item.value)

    def _unpack(self, fp: typing.BinaryIO, length=None) -> 'Array':
        if length is None:
            length = cls._length
        if length == 0:
    def _unpack(self, fp: typing.BinaryIO) -> 'Array':
        if self._length == 0:
            length = _read_varint(fp, signed=False)
        else:
            length = self._length
        values = []
        for _ in range(length):
            val = self._type._unpack(fp)


@@ 424,15 455,14 @@ class Map(Field):
            self._keytype._pack(fp, value=k)
            self._valuetype._pack(fp, value=v)

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "Map":
    def _unpack(self, fp: typing.BinaryIO) -> "Map":
        count = _read_varint(fp, signed=False)
        values = {}
        for _ in range(count):
            key = cls._keytype._unpack(fp)
            value = cls._valuetype.unpack(fp)
            key = self._keytype._unpack(fp)
            value = self._valuetype.unpack(fp)
            values[key] = value
        return cls(key=cls._keytype, value=cls._valuetype, values=values)
        return self.__class__(key=self._keytype, value=self._valuetype, values=values)


class Optional(Field):


@@ 496,6 526,10 @@ class Union(Field):
                self._members.append(member(value=value))
            else:
                self._members.append(member)
        if value is not None:
            valid, message = self.validate(value)
            if not valid:
                raise ValidationError(f"Attempting to set incorrect value to Union type: {type(value)}")
        self._value = value

    @property


@@ 531,9 565,14 @@ class Union(Field):
        raise TypeError("Unable to determine Union member type for value.")

    def _unpack(self, fp: typing.BinaryIO):
        id = _read_varint(fp, signed=False)
        value = self._members[id]._unpack(fp)
        return self.__class__(value=self._members[id].__class__(value=value.value))
        uid = _read_varint(fp, signed=False)
        value = self._members[uid]._unpack(fp)
        return self.__class__(members=self._members, value=value)

    def to_dict(self, value=None):
        if value is None:
            value = self._value
        return value.to_dict()


def _write_string(fp: typing.BinaryIO, val: str):

M bare/test_encoder.py => bare/test_encoder.py +63 -1
@@ 5,6 5,8 @@ from .encoder import Struct, Map, Array, _ValidatedMap, ValidationError, Optiona
from collections import OrderedDict
import pytest
import enum
import io
import os


class Nested(Struct):


@@ 119,7 121,6 @@ class UnionTest(Struct):
    e = ExampleUnion()
    b = Union(members=(Str, Int))
    c = Union(members=(OptionalStruct,ArrayTest))

def test_union():
    ex = UnionTest(e=1, b="test", c=ArrayTest(a=[1], n=[Nested(s='s')])) # MUST specify values for union types when creating an object
    assert ex.e == 1


@@ 147,3 148,64 @@ def test_enum():
    assert ex.e == 0
    with pytest.raises(ValidationError):
        ex.e = 100


class PublicKey(DataFixed):
    _length = 128

class Time(Str):
    pass

class Department(enum.Enum):
    ACCOUNTING = 0
    ADMINISTRATION = 1
    CUSTOMER_SERVICE = 2
    DEVELOPMENT = 3

    JSMITH = 99

class Address(Struct):
    address = Array(Str, length=4)
    city = Str()
    state = Str()
    country = Str()

class Order(Struct):
    orderID = I64()
    quantity = I32()

class Customer(Struct):
    name = Str()
    email = Str()
    address = Address()
    orders = Array(Order)
    metadata: Map(Str, Data)

class Employee(Struct):
    name = Str()
    email = Str()
    address = Address()
    department = Enum(Department)
    hireDate = Time()
    publicKey = Optional(PublicKey)
    metadata = Map(Str, Data)

class TerminatedEmployee(Void):
    pass

class Person(Union):
    _members = (Customer, Employee, TerminatedEmployee)

@pytest.mark.parametrize('file', ['customer.bin', 'employee.bin', 'people.bin', 'terminated.bin'])
def test_people(file):
    with open(os.path.join(os.path.dirname(__file__), '_examples', file), 'br') as f:
        p = Person().unpack(f)
        p.to_dict()
        f.seek(0)
        f = f.read()
        buf = io.BytesIO()
        p.pack(buf)
        #assert buf.getvalue() == f
    with open('./test.bin', 'bw') as f:
        p.pack(fp=f)


M bare/types.py => bare/types.py +20 -23
@@ 22,10 22,9 @@ class Simple(Field):
            value = self._value
        fp.write(struct.pack(self.__class__._fmt, value))

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO):
        buf = fp.read(cls._bytesize)
        return cls(value=(struct.unpack(cls._fmt, buf)[0]))
    def _unpack(self, fp: typing.BinaryIO):
        buf = fp.read(self._bytesize)
        return self.__class__(value=(struct.unpack(self._fmt, buf)[0]))


class U8(Simple):


@@ 212,9 211,8 @@ class Void(Field):
    def _pack(self, fp: typing.BinaryIO, value=None):
        pass  # NO OP

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO):
        return cls(value=None)
    def _unpack(self, fp: typing.BinaryIO):
        return self.__class__(value=None)

    def validate(self, value):
        if value is not None:


@@ 237,10 235,9 @@ class Int(Field):
            value = self._value
        _write_varint(fp, value, signed=True)

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "Int":
    def _unpack(self, fp: typing.BinaryIO) -> "Int":
        val = _read_varint(fp, signed=True)
        return cls(value=val)
        return self.__class__(value=val)

class UInt(Field):



@@ 259,10 256,9 @@ class UInt(Field):
            value = self._value
        _write_varint(fp, value, signed=False)

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "UInt":
    def _unpack(self, fp: typing.BinaryIO) -> "UInt":
        val = _read_varint(fp, signed=False)
        return cls(value=val)
        return self.__class__(value=val)


class Str(Field):


@@ 280,10 276,9 @@ class Str(Field):
            value = self._value
        _write_string(fp, value)

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "Str":
    def _unpack(self, fp: typing.BinaryIO) -> "Str":
        val = _read_string(fp)
        return cls(value=val)
        return self.__class__(value=val)


class Data(Field):


@@ 306,11 301,10 @@ class Data(Field):
        _write_varint(fp, val=length, signed=False)
        fp.write(fp.write(struct.pack(f"<{len(value)}s", value)))

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO) -> "Data":
    def _unpack(self, fp: typing.BinaryIO) -> "Data":
        length = _read_varint(fp, signed=False)
        val = struct.unpack("<{length}s", fp)[0]
        return cls(value=val)
        return self.__class__(value=val)


class DataFixed(Field):


@@ 344,12 338,11 @@ class DataFixed(Field):
            value = self._value
        fp.write(struct.pack(f"<{self._length}s", value))

    @classmethod
    def _unpack(cls, fp: typing.BinaryIO, length=None) -> "DataFixed":
    def _unpack(self, fp: typing.BinaryIO, length=None) -> "DataFixed":
        if length is None:
            length = cls._length
            length = self._length
        val = struct.unpack(f"<{length}s", fp)[0]
        return cls(value=val)
        return self.__class__(value=val)

class Enum(UInt):



@@ 366,3 359,7 @@ class Enum(UInt):
        if value not in values:
            return False, f"value {value} is not a valid Enum type for {self.__class__.__name__}"
        return True, None

    def _unpack(self, fp: typing.BinaryIO) -> 'UInt':
        val = _read_varint(fp, signed=False)
        return self.__class__(self._enum, val)