~nch/glue

f0ec6f8f1f8095c010e438f5792fa349210c2ecf — nc 1 year, 2 months ago 5033317
yet another attempt... still broken with fib
2 files changed, 156 insertions(+), 29 deletions(-)

M dataflow.py
M test_dataflow.py
M dataflow.py => dataflow.py +69 -29
@@ 1,5 1,57 @@
from inspect import isgenerator
import functools
from dataclasses import dataclass
from typing import *

def stream(f):
    @functools.wraps(f)
    def g(*args):
        return _Stream(f, *args).copy()
    return g

@stream
def add(a, b):
    while True:
        yield next(a) + next(b)

@stream
def mul(a, b):
    while True:
        yield next(a) * next(b)


@stream
def var_stream(env, v):
    while True:
        yield env[v]

@stream
def bind_stream(env, result_var, bindings):
    while True:
        for k, v in bindings.items():
            env[k] = next(v)
        yield env[result_var]

@stream
def int_(i):
    while True:
        yield i

@stream
def fby(x, y):
    yield x
    while True:
        yield y

@stream
def var(name):
    pass

@stream
def next_(s):
    next(s) # advance one step further
    while True:
        yield next(s)

class _StreamHandle:
    def __init__(self, stream, i):


@@ 30,18 82,12 @@ class _StreamHandle:
    def copy(self):
        return _StreamHandle(self.stream, self.i)

    def __add__(self, other):
        def addf(a, b):
            while True:
                yield next(a) + next(b)
        return stream(addf)(self, other)

    def where(self, **kwargs):
        pass # TODO
    __add__ = add

class _Stream:
    def __init__(self, f, *args):
        self.gen = f(*args)
        self.deps = [a for a in args if isinstance(a, _Stream) or isinstance(a, _StreamHandle)]
        self.f = f
        self.vals = []



@@ 58,28 104,22 @@ class _Stream:
    def copy(self):
        return _StreamHandle(self, len(self.vals))

def stream(f):
    @functools.wraps(f)
    def g(*args):
        return _Stream(f, *args).copy()
    return g
@dataclass
class Var:
    name: str
    env: Dict

@stream
def int_(i):
    while True:
        yield i
    def __next__(self): # Uuhhhhh does this actually work?
        return self.env[self.name]

@stream
def fby(x, y):
    yield x
    while True:
        yield y
    def __call__(self):
        return var_stream(self.env, self.name)

@stream
def var(name):
    pass
    def where(self, **kwargs):
        return bind_stream(self.env, self.name, kwargs)

@stream
def next_(s): # TODO test
    while True:
        yield next(s)
    def __add__(self, other):
        return add(var_stream(self.env, self.name), other)

    def __mul__(self, other):
        return mul(var_stream(self.env, self.name), other)

M test_dataflow.py => test_dataflow.py +87 -0
@@ 55,6 55,93 @@ class TestDataflow2(unittest.TestCase):
        self.assertEqual(next(s2), 3)
        self.assertEqual(next(s2), 4)

    def test_var1(self):
        self.assertEqual(
            next(Var('x', {}).where(x = int_(1))), 1)
        p = Var('x', {}).where(x = fby(1, 2))
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 2)

    def test_var2(self):
        n = Var('n', {})
        p = n.where(n = fby(int_(1), n + int_(1)))
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 3)
        self.assertEqual(next(p), 4)

    def test_var3(self):
        env = {}
        a = Var('a', env)
        b = Var('b', env)
        p = b.where(a = fby(int_(1), a + int_(1)),
                    b = a)
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 3)
        self.assertEqual(next(p), 4)

    def test_var3(self):
        n = Var('n', {})
        p = n.where(n = fby(int_(1), n + n))
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 4)
        self.assertEqual(next(p), 8)

    def test_fac(self):
        env = {}
        fac = Var('fac', env)
        n = Var('n', env)
        p = fac.where(n = fby(int_(0), n + int_(1)),
                      fac = fby(int_(1), fac * (n + int_(1))))
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 6)
        self.assertEqual(next(p), 24)
        self.assertEqual(next(p), 120)
        self.assertEqual(next(p), 720)

    def test_next_(self):
        @stream
        def f():
            yield 1
            yield 2
            yield 3
            yield 4

        s = next_(f())

        self.assertEqual(next(s), 2)
        self.assertEqual(next(s), 3)
        self.assertEqual(next(s), 4)

    def test_next_1(self):
        p = next_(fby(1, fby(2, fby(3, 4))))
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 3)
        self.assertEqual(next(p), 4)

    def test_next_2(self):
        env = {}
        n = Var('n', env)
        m = Var('m', env)
        p = m.where(n = fby(1, n + int_(1)),
                    m = next_(next_(n)))

        self.assertEqual(next(p), 3)
        self.assertEqual(next(p), 4)
        self.assertEqual(next(p), 5)

    def test_fib(self):
        fib = Var('fib', {})
        p = fib.where(fib = fby(0, fby(1, fib + next_(fib))))
        self.assertEqual(next(p), 0)
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 1)
        self.assertEqual(next(p), 2)
        self.assertEqual(next(p), 3)

class TestDataflow:#(unittest.TestCase):
    def test_fby1(self):