~nch/glue

ea3a590e496ffd97d17b67879f3072aae73bc1c3 — nc 1 year, 2 months ago 423c674
WIP attempt with generators
3 files changed, 113 insertions(+), 266 deletions(-)

M dataflow.py
D dataflow2.py
M test_dataflow.py
M dataflow.py => dataflow.py +57 -153
@@ 1,164 1,68 @@
from typing import *
from inspect import isgenerator
import functools

from functools import reduce
import operator

T = TypeVar('T')

class DataflowProgram:
    def __init__(self):
        self.definition = None # computes the final result
        self.env = {} # where expressions

    def __call__(self, definition):
        self.definition = definition
        return self

    def var(self, name):
        return Var(name, self.env)
class _StreamHandle:
    def __init__(self, stream, i):
        self.stream = stream
        self.i = i

    def where(self, **givens):
        self.env.update(givens)
    def __iter__(self):
        return self

    def step(self):
        visited_nodes = set()
        def step_inner(node):
            if node in visited_nodes:
                return

            visited_nodes.add(node)

            for d in node.parents:
                step_inner(d)

            #print('node', node)
            #print('before', node.value)
            node.step()
            #print('after', node.value)

        for n in list(self.env.values()) + [self.definition]:
            step_inner(n)

        return self.definition.value

class Stream:
    def __init__(self):
        self.value = None
        self.first = None
        self._i = 0

    def compute_next(self) -> T:
        ...

    def step(self):
        self.value = self.compute_next()
        if self._i == 0:
            self.first = self.value
        self._i += 1
        return self.value

    def fby(self, expr):
        return FbyStream(self, expr)

    @property
    def parents(self):
        return []

class FbyStream(Stream):
    def __init__(self, first, rest):
        self._first = first
        self._rest = rest
        super().__init__()

    def compute_next(self): # FIXME just... yuck
        if self._i == 0:
            return self._first.value
    def __next__(self):
        i = self.i
        self.i += 1 # stream advances...
        if i > len(self.stream.vals) - 1:
            return next(self.stream)
        else:
            return self._rest.value
            return self.stream.vals[i]

    def __str__(self):
        return f'{self._first} fby {self._rest}'

    @property
    def parents(self):
        if self._i == 0:
            return [self._first]
    def __repr__(self):
        if self.stream.f:
            return f'<streamhandle {self.stream.f.__name__}>'
        else:
            return [self._rest]

class BinOps:
    def __add__(self, x):
        return Expr(operator.add, self, x)
    def __mul__(self, x):
        return Expr(operator.mul, self, x)

class Int(Stream, BinOps):
    def __init__(self, i):
        self.i = i
        super().__init__()

    def __str__(self):
        return str(self.i)

    def compute_next(self):
        return self.i

class Expr(Stream):
    def __init__(self, op, *args):
        self.op = op
        self.args = args
        super().__init__()

    def compute_next(self):
        vals = [a.value for a in self.args]
        assert not any(isinstance(x, Var) for x in vals)
        return reduce(self.op, vals)

    def __str__(self):
        s = ' '.join(map(str, self.args))
        return f'({self.op.__name__} {s})'
            return '<streamhandle>'

    @property
    def parents(self):
        return self.args
    def copy(self):
        return _StreamHandle(self.stream, self.i)

Env = Dict[str, Expr]
class _Stream:
    def __init__(self, f, *args):
        self.gen = f(*args)
        self.f = f
        self.vals = []

class FuncStream(Stream):
    def __init__(self, expr):
        self.expr = expr
        super().__init__()

    @property
    def value(self):
        return self.expr()

    @value.setter
    def value(self, junk):
        pass

class Var(Stream, BinOps):
    def __init__(self, name, env):
        self.name = name
        self.env = env
        super().__init__()

    def __str__(self):
        return self.name

    def next(self):
        return FuncStream(lambda: self.env[self.name].compute_next())

    @property
    def parents(self):
        return [self.env[self.name]]

    @property
    def value(self):
        return self.env[self.name].value

    @value.setter
    def value(self, junk):
        pass
    def __iter__(self):
        return self

    def __next__(self):
        r = next(self.gen)
        if isinstance(r, _Stream) or isinstance(r, _StreamHandle):
            r = next(r)
        self.vals.append(r)
        return r

    def copy(self):
        return _StreamHandle(self, len(self.vals))

def stream(f):
    @functools.wraps(f)
    def g(*args):
        return _Stream(f, *args).copy()
    return g

@stream
def int_(i):
    while True:
        yield i

@stream
def fby(x, y):
    yield x
    while True:
        yield y

@stream
def next_(s):
    while True:
        yield next(s)

D dataflow2.py => dataflow2.py +0 -112
@@ 1,112 0,0 @@
import operator

PRINT_STYLE = 'lisp'

def node_initializer(initf):
    def f(self, *args, **kwargs):
        self.deps = []

        if not hasattr(node_initializer, 'all_nodes'):
            node_initializer.all_nodes = set()

        node_initializer.all_nodes.add(self)

        for a in args:
            if isinstance(a, Val):
                self.deps.append(a)

        return initf(self, *args, **kwargs)
    return f

def step_everything():
    stepped = set()
    def step(x):
        if x in stepped:
            return
        for y in x.deps:
            step(y)
        x.value = x.next()
        stepped.add(x)

    for node in node_initializer.all_nodes:
        step(node)

class Val: # just used for type checking
    pass

class BinOps:
    def __add__(self, x):
        return Expr(operator.add, self, x)
    def __mul__(self, x):
        return Expr(operator.mul, self, x)
    def __sub__(self, x):
        return Expr(operator.sub, self, x)

class Expr(Val, BinOps):
    @node_initializer
    def __init__(self, op, *args):
        self.op = op
        self.args = args

    def __str__(self):
        if PRINT_STYLE == 'py':
            s = ', '.join(map(str, self.args))
            return f'{self.op.__name__}({s})'
        elif PRINT_STYLE == 'lisp':
            s = ' '.join(map(str, self.args))
            return f'({self.op.__name__} {s})'

class Int(Val, BinOps):
    @node_initializer
    def __init__(self, i):
        self.i = i

    def __str__(self):
        if PRINT_STYLE == 'py':
            return f'Int({self.i})'
        elif PRINT_STYLE == 'lisp':
            return str(self.i)

class Var(Val, BinOps):
    @node_initializer
    def __init__(self, name):
        self.name = name

    def __str__(self):
        if PRINT_STYLE == 'py':
            return f"Var('{self.name}')"
        elif PRINT_STYLE == 'lisp':
            return self.name


'''
def lift(x):
    if isinstance(x, Val):
        return x # already lifted
    if isinstance(x, int):
        return Int(x)
    assert False, type(x)

def eval_expr(tree, bindings):
    def chase(x):
        if isinstance(x, Var) and x.name in bindings:
            return chase(bindings[x.name])
        else:
            return x

    if isinstance(tree, Expr):
        args = [eval_expr(a, bindings) for a in tree.args]
        if any(isinstance(a, Var) for a in args): # can't evaluate yet
            return Expr(tree.op, *[lift(a) for a in args])
        else:
            return tree.op(*args)
    if isinstance(tree, Int):
        return tree.i
    if isinstance(tree, Var):
        v = chase(tree)
        if isinstance(v, Expr):
            return eval_expr(v, bindings)
        else:
            return v
    assert False, f"shouldn't get here: {type(tree)}"
'''

M test_dataflow.py => test_dataflow.py +56 -1
@@ 2,13 2,68 @@ import unittest
from dataflow import *
import operator

class TestDataflow(unittest.TestCase):
class TestDataflow2(unittest.TestCase):
    def test_stream(self):
        @stream
        def test_f():
            yield 1
            yield 2
            yield 3

        it = test_f()

        self.assertEqual(next(it), 1)
        self.assertEqual(next(it), 2)
        self.assertEqual(next(it), 3)

    def test_int(self):
        it1 = int_(1)
        self.assertEqual(next(it1), 1)
        self.assertEqual(next(it1), 1)

        it2 = int_(2)
        self.assertEqual(next(it2), 2)
        self.assertEqual(next(it1), 1)

    def test_fby(self):
        s = fby(1, 2)
        self.assertEqual(next(s), 1)
        self.assertEqual(next(s), 2)
        self.assertEqual(next(s), 2)

    def test_fby2(self):
        s = fby(int_(1), int_(2))
        self.assertEqual(next(s), 1)
        self.assertEqual(next(s), 2)
        self.assertEqual(next(s), 2)

    def test_fby3(self):
        s = fby(int_(1), fby(int_(2), int_(3)))
        self.assertEqual(next(s), 1)
        self.assertEqual(next(s), 2)
        self.assertEqual(next(s), 3)
        self.assertEqual(next(s), 3)

    def test_fby4_with_duplicate_streams(self):
        s1 = fby(int_(1), fby(int_(2), fby(int_(3), int_(4))))
        self.assertEqual(next(s1), 1)
        s2 = s1.copy()
        self.assertEqual(next(s1), 2)
        self.assertEqual(next(s1), 3)
        self.assertEqual(next(s2), 2)
        self.assertEqual(next(s1), 4)
        self.assertEqual(next(s2), 3)
        self.assertEqual(next(s2), 4)


class TestDataflow:#(unittest.TestCase):
    def test_fby1(self):
        p = DataflowProgram()
        n = p.var('n')
        p(n).where(n = Int(0).fby(Int(1)))
        self.assertEqual(p.step(), 0)
        self.assertEqual(p.step(), 1)
        self.assertEqual(p.step(), 1)

    def test_fby2(self):
        p = DataflowProgram()