~nch/glue

423c67491a0db053998bb8165a01d0d195f03f66 — nc 1 year, 3 months ago 6503844
WIP got some basic Lucid-ish code running - one is still broken
6 files changed, 245 insertions(+), 218 deletions(-)

M arrowlets.py
A dataflow.py
D dataflow1.py
M table.py
A test_dataflow.py
D test_dataflow1.py
M arrowlets.py => arrowlets.py +1 -3
@@ 1,8 1,6 @@
from typing import Callable, TypeVar, Any, Generic
from typing import Callable, Any
from dataclasses import dataclass

A = TypeVar('A')

class _CpsA: # (x, k) -> ()
    @staticmethod
    def lift(f) -> '_CpsA':

A dataflow.py => dataflow.py +164 -0
@@ 0,0 1,164 @@
from typing import *

from functools import reduce
import operator

T = TypeVar('T')

class DataflowProgram:
    def __init__(self):
        self.definition = None # computes the final result
        self.env = {} # where expressions

    def __call__(self, definition):
        self.definition = definition
        return self

    def var(self, name):
        return Var(name, self.env)

    def where(self, **givens):
        self.env.update(givens)
        return self

    def step(self):
        visited_nodes = set()
        def step_inner(node):
            if node in visited_nodes:
                return

            visited_nodes.add(node)

            for d in node.parents:
                step_inner(d)

            #print('node', node)
            #print('before', node.value)
            node.step()
            #print('after', node.value)

        for n in list(self.env.values()) + [self.definition]:
            step_inner(n)

        return self.definition.value

class Stream:
    def __init__(self):
        self.value = None
        self.first = None
        self._i = 0

    def compute_next(self) -> T:
        ...

    def step(self):
        self.value = self.compute_next()
        if self._i == 0:
            self.first = self.value
        self._i += 1
        return self.value

    def fby(self, expr):
        return FbyStream(self, expr)

    @property
    def parents(self):
        return []

class FbyStream(Stream):
    def __init__(self, first, rest):
        self._first = first
        self._rest = rest
        super().__init__()

    def compute_next(self): # FIXME just... yuck
        if self._i == 0:
            return self._first.value
        else:
            return self._rest.value

    def __str__(self):
        return f'{self._first} fby {self._rest}'

    @property
    def parents(self):
        if self._i == 0:
            return [self._first]
        else:
            return [self._rest]

class BinOps:
    def __add__(self, x):
        return Expr(operator.add, self, x)
    def __mul__(self, x):
        return Expr(operator.mul, self, x)

class Int(Stream, BinOps):
    def __init__(self, i):
        self.i = i
        super().__init__()

    def __str__(self):
        return str(self.i)

    def compute_next(self):
        return self.i

class Expr(Stream):
    def __init__(self, op, *args):
        self.op = op
        self.args = args
        super().__init__()

    def compute_next(self):
        vals = [a.value for a in self.args]
        assert not any(isinstance(x, Var) for x in vals)
        return reduce(self.op, vals)

    def __str__(self):
        s = ' '.join(map(str, self.args))
        return f'({self.op.__name__} {s})'

    @property
    def parents(self):
        return self.args

Env = Dict[str, Expr]

class FuncStream(Stream):
    def __init__(self, expr):
        self.expr = expr
        super().__init__()

    @property
    def value(self):
        return self.expr()

    @value.setter
    def value(self, junk):
        pass

class Var(Stream, BinOps):
    def __init__(self, name, env):
        self.name = name
        self.env = env
        super().__init__()

    def __str__(self):
        return self.name

    def next(self):
        return FuncStream(lambda: self.env[self.name].compute_next())

    @property
    def parents(self):
        return [self.env[self.name]]

    @property
    def value(self):
        return self.env[self.name].value

    @value.setter
    def value(self, junk):
        pass


D dataflow1.py => dataflow1.py +0 -128
@@ 1,128 0,0 @@
from typing import TypeVar, Generic, Optional, Dict, Any, Iterable, Tuple, Callable

from functools import reduce
import operator

# XXX this code is total crap. Fix it at some point.

T = TypeVar('T')

class Stream(Generic[T]):
    def __init__(self, *deps): # this is a *BAD* idea -- deps tracking should be automatic
        self.value = None
        self.first = None
        self.deps = deps

    def next(self) -> T:
        ...

    def step(self):
        self.value = self.next()
        if self.first is None: # XXX is subtly wrong if None is a valid value
            self.first = self.value
        return self.value

    def fby(self, expr):
        return FbyStream(self, expr)

class FbyStream(Stream):
    def __init__(self, first, rest):
        self._first = first
        self._rest = rest
        self._is_first = True
        super().__init__(self._rest)

    def next(self): # FIXME just... yuck
        if self._is_first:
            self._is_first = False
            return self._first.value
        else:
            return self._rest.value

class IterStream(Stream):
    def __init__(self, i: Iterable):
        self.iter = iter(i)
        super().__init__()

    def next(self):
        return next(self.iter)

class BinOps:
    def __add__(self, x):
        return Expr(operator.add, self, x)
    def __mul__(self, x):
        return Expr(operator.mul, self, x)

class Int(Stream[int], BinOps):
    def __init__(self, i):
        self.i = i
        super().__init__()

    def __str__(self):
        return f'Int({self.i})'

    def next(self):
        return self.i

class Expr(Stream):
    def __init__(self, op, *args):
        self.op = op
        self.args = args
        super().__init__(*self.args)

    def next(self):
        # TODO: Hmm... determine if this should be a.value or a.next
        vals = [a.value for a in self.args]
        if any(isinstance(x, _Var) for x in vals):
            return self # not reducible yet
        else:
            return reduce(self.op, vals)

    def __str__(self):
        s = ' '.join(map(str, self.args))
        return f'({self.op} {s})'

Env = Dict[str, Expr]

def step(env, v):
    # TODO: register all nodes and topological sort then don't do the
    # semi-lazy graph compute thing that's happening here
    processed = set()
    def step_inner(x):
        if x in processed:
            return

        for d in x.deps:
            step_inner(d)

        x.step()
        processed.add(x)

    for v in env.values():
        step_inner(v)

    return env[v]


class _Var(Stream, BinOps):
    def __init__(self, env, name):
        self.name = name
        self.env = env
        super().__init__()

    def __str__(self):
        return f'Var("{self.name}")'

    def bind(self, x):
        self.env[self.name] = x
        self.value = x.value

    def compute_next(self):
        if self.name in self.env:
            return self.env[self.name].next()
        else:
            return self # is this even correct? I think it is...

def create_env() -> Tuple[Env, Callable]: # -> env, Var class
    bindings: Env = {}
    return bindings, lambda varname: _Var(bindings, varname)

M table.py => table.py +23 -9
@@ 1,5 1,5 @@
from dataclasses import dataclass
from typing import List, Dict
from typing import List, Dict, Tuple, Iterator, Optional, Union
import numpy as np # type: ignore
from collections.abc import Iterable



@@ 8,24 8,29 @@ from collections.abc import Iterable
# TODO: add referential integrity...
# TODO: add serialization/deserialization

def _ensure_columns_match(a, b):
def _ensure_columns_match(a: List, b: List) -> None:
    if set(a) != set(b):
        raise ValueError('columns do not match', a, b)

class Table:
    def __init__(self, x, columns=None): # TODO: refactor this spaghetti
        self.columns = None
    columns: List[str]
    def __init__(self, x: Union[List, Tuple, Dict], columns: Optional[List[str]] = None): # TODO: refactor this spaghetti
        '''
        pre: self.columns is not None or len(x) > 0
        post: self.columns is not None and len(self.columns) > 0
        '''
        if isinstance(x, list) or isinstance(x, tuple):
            if columns is not None:
                if len(columns) != len(x[0]):
                    raise ValueError('Length of columns does not match length of data')
                self.columns = columns
            else:
                self.columns = range(len(x[0]))
                # we know x is nonempty, so mypy can ignore the type here
                self.columns = list(map(str, range(len(x[0]))))
            self.column_data = [np.array(xs) for xs in zip(*x)]
        elif isinstance(x, dict):
            if columns is not None:
                _ensure_columns_match(x.keys(), columns)
                _ensure_columns_match(list(x.keys()), columns)
                self.columns = columns
            else:
                self.columns = list(x.keys())


@@ 36,13 41,16 @@ class Table:
        assert(self.columns)
        assert(self.column_data)

    def __repr__(self):
        return f'<Table {self.columns}>'

    def coli(self, name: str):
        try:
            return self.columns.index(name)
        except ValueError:
            raise AttributeError('No such column', name)

    def tuples(self):
    def tuples(self) -> Iterator[Tuple]:
        for t in zip(*self.column_data):
            yield t



@@ 72,13 80,19 @@ class Table:
            elif isinstance(colkey, slice):
                return self[self.columns[colkey], rowkey]

    def vstack(self, other):
    def vstack(self, other: 'Table'):
        '''
        post: len(__return__) == len(self) + len(other)
        '''
        _ensure_columns_match(self.columns, other.columns)
        return Table({c: np.hstack((self[c], other[c])) for c in self.columns}, columns=self.columns)

    def append(self, vals: dict):
        '''
        post: len(__return__) == len(self) + 1
        '''
        if isinstance(vals, dict):
            _ensure_columns_match(vals.keys(), self.columns)
            _ensure_columns_match(list(vals.keys()), self.columns)
            return self.vstack(Table({k: [v] for k, v in vals.items()}))
        elif isinstance(vals, tuple):
            if len(vals) != len(self.columns): # TODO: maybe typecheck here?

A test_dataflow.py => test_dataflow.py +57 -0
@@ 0,0 1,57 @@
import unittest
from dataflow import *
import operator

class TestDataflow(unittest.TestCase):
    def test_fby1(self):
        p = DataflowProgram()
        n = p.var('n')
        p(n).where(n = Int(0).fby(Int(1)))
        self.assertEqual(p.step(), 0)
        self.assertEqual(p.step(), 1)

    def test_fby2(self):
        p = DataflowProgram()
        n = p.var('n')
        p(n).where(n = Int(0).fby(Int(1).fby(Int(2))))
        self.assertEqual(p.step(), 0)
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 2)

    def test_recursion(self):
        p = DataflowProgram()
        n = p.var('n')
        p(n).where(n = Int(0).fby(n + Int(1)))
        self.assertEqual(p.step(), 0)
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 2)
        self.assertEqual(p.step(), 3)
        self.assertEqual(p.step(), 4)

    def test_fac(self):
        p = DataflowProgram()
        fac = p.var('fac')
        n = p.var('n')
        p(fac).where(
                n = Int(0).fby(n + Int(1)),
                fac = Int(1).fby(fac * (n + Int(1))))
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 2)
        self.assertEqual(p.step(), 6)
        self.assertEqual(p.step(), 24)
        self.assertEqual(p.step(), 120)
        self.assertEqual(p.step(), 720)

    def test_fib(self): # FIXME
        p = DataflowProgram()
        fib = p.var('fib')
        p(fib).where(fib = Int(0).fby(Int(1).fby(fib + fib.next())))

        self.assertEqual(p.step(), 0)
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 2)
        self.assertEqual(p.step(), 3)

if __name__ == '__main__':
    unittest.main()

D test_dataflow1.py => test_dataflow1.py +0 -78
@@ 1,78 0,0 @@
import unittest
from dataflow1 import *

class TestFRP(unittest.TestCase):
    def test_int(self):
        i = Int(1)
        self.assertEqual(i.step(), 1)
        self.assertEqual(i.step(), 1)

    def test_var(self):
        env, Var = create_env()
        v = Var('v')
        self.assertEqual(v.step(), v)
        v.bind(Int(1))
        self.assertEqual(v.step(), 1)

    def test_expr(self):
        env, Var = create_env()
        e = Var('e')
        e.bind(Int(1) + Int(1))
        self.assertEqual(step(env, 'e'), 2)
        self.assertEqual(step(env, 'e'), 2)
        self.assertEqual(step(env, 'e'), 2)

    def test_expr2(self):
        env, Var = create_env()
        n = Var('n')
        n.bind(Int(1))

        e = n + Int(1)
        self.assertEqual(e.value, 2)
        self.assertEqual(e.next(), 2)

    def test_fby(self):
        env, Var = create_env()
        n = Var('n')
        n.bind(Int(1).fby(Int(2)))
        self.assertEqual(n.value, 1)
        self.assertEqual(n.next(), 2)

        n.bind(Int(1).fby(Int(2).fby(Int(3))))
        self.assertEqual(n.value, 1)
        self.assertEqual(n.next(), 2)
        self.assertEqual(n.next(), 3)

    def test_basic_accessors(self):
        env, Var = create_env()
        i = IterStream([1, 2])
        self.assertEqual(i.step(), 1)
        self.assertEqual(i.first, 1)
        self.assertEqual(i.value, 1)
        self.assertEqual(i.next(), 2)

    def test_recursive(self):
        env, Var = create_env()
        n = Var('n')
        n.bind(Int(1).fby(n + Int(1)))

        self.assertEqual(n.value, 1)
        self.assertEqual(n.next(), 2)
        self.assertEqual(n.next(), 3)
        self.assertEqual(n.next(), 4)

    def test_factorial(self):
        env, Var = create_env()
        n = Var('n')
        fac = Var('fac')
        n.bind(Int(0).fby(n + Int(1)))
        fac.bind(Int(1).fby(fac * (n + Int(1))))

        self.assertEqual(fac.value, 1)
        self.assertEqual(fac.next(), 1)
        self.assertEqual(fac.next(), 2)
        self.assertEqual(fac.next(), 6)
        self.assertEqual(fac.next(), 24)

if __name__ == '__main__':
    unittest.main()