7 files changed, 0 insertions(+), 951 deletions(-)
D arrowlets.py
D arrowlets_test.py
D dataflow.py
D glue.py
D table.py
D table_test.py
D test_dataflow.py
D arrowlets.py => arrowlets.py +0 -164
@@ 1,164 0,0 @@
-from typing import Callable, Any
-from dataclasses import dataclass
-
-class _CpsA: # (x, k) -> ()
- @staticmethod
- def lift(f) -> '_CpsA':
- return _CpsA(lambda x, k: k(f(x)))
-
- def __init__(self, cps: Callable[[Any, Callable], None]):
- self.cps = cps
-
- def next(self, g):
- g = CpsA(g)
- # Basically cons-ing onto the next function functiong
- # (we build the stack backwards so when we pass `x` in during the run it gets
- # sent through the entire stack)
- # In other words, CPS can be viewed as turning a sequence of funcalls into a linked list
- return _CpsA(lambda x, k: self.cps(x, lambda y: g.cps(y, k)))
-
- def run(self, x):
- return self.cps(x, lambda y: y)
-
-def CpsA(f):
- if isinstance(f, _CpsA): # identity
- return f
- if callable(f): # function
- return _CpsA.lift(f) # lifted into CpsA
- assert False, f
-
-
-class SimpleEventA(_CpsA):
- def __init__(self, eventname):
- self.eventname = eventname
- def _f(target, k):
- def handler(event):
- # might have to try/catch unbind
- target.unbind(eventname, handler)
- k((target, event))
- target.bind(eventname, handler)
- super().__init__(_f)
-
-def AsyncA(f):
- if isinstance(f, _AsyncA):
- return f
- if callable(f): # function
- return _AsyncA.lift(f)
- assert False, f
-
-@dataclass
-class Repeat:
- value: Any
-
-@dataclass
-class Done:
- value: Any
-
-class _AsyncA: # (x, p, k) -> ()
- def __init__(self, cps):
- self.acps = cps
-
- def next(self, g):
- g = AsyncA(g)
- def inner(x, p, k):
- self.acps(x, p, lambda y, q: g.acps(y, q, k))
- return _AsyncA(inner)
-
- def run(self, x, p=None) -> 'ProgressA':
- p = p if p else ProgressA()
- self.acps(x, p, lambda y, p: y)
- return p
-
- def repeat(self):
- def repeatf(x, p, k):
- def inner_repeatf(y, q):
- if isinstance(y, Repeat):
- repeatf(y.value, q, k)
- elif isinstance(y, Done):
- k(y.value, q)
- else:
- raise TypeError("Unknown type to repeat on", type(y))
- self.acps(x, p, inner_repeatf)
- return _AsyncA(repeatf)
-
- def product(self, g):
- g = AsyncA(g)
- def productf(x, p, k):
- c = [2]
- out = [None, None]
- def set_out(i, y):
- out[i] = y
- # barrier
- c[0] -= 1
- if c[0] == 0:
- k((out[0], out[1]), p)
- self.next(lambda y1: set_out(0, y1)).run(x[0], p)
- g.next(lambda y2: set_out(1, y2)).run(x[1], p)
- return _AsyncA(productf)
-
- def or_(self, g):
- g = AsyncA(g)
- def or_f(x, p, k):
- prog = [ProgressA(), ProgressA()]
- def end(i):
- prog[i].cancel()
- prog[i] = None
- prog[0].next(lambda _: end(1)).run(None)
- prog[1].next(lambda _: end(0)).run(None)
-
- def cancel():
- if prog[0]: prog[0].cancel()
- if prog[1]: prog[1].cancel()
-
- def join(y, q):
- p.advance(cancel)
- k(y, q)
-
- p.add_canceller(cancel)
- self.acps(x, prog[0], join)
- g.acps(x, prog[1], join)
- return _AsyncA(or_f)
-
- @staticmethod
- def lift(f: Callable[[Any], Any]):
- return _AsyncA(lambda x, p, k: k(f(x), p))
-
-def ConstA(x):
- return AsyncA(lambda _: x)
-
-class ProgressA(_AsyncA):
- def __init__(self):
- self.cancellers = []
- self.observers = []
- super().__init__(lambda x, p, k: self.observers.append(lambda y: k(y, p)))
-
- def add_canceller(self, canceller):
- self.cancellers.append(canceller)
-
- def advance(self, canceller):
- try:
- i = self.cancellers.index(canceller)
- del self.cancellers[i]
- except:
- pass
- while self.observers:
- self.observers.pop()(None)
-
- def cancel(self):
- while self.cancellers:
- self.cancellers.pop()()
-
-class EventA(_AsyncA):
- def __init__(self, eventname):
- self.eventname = eventname
- def inner(target, p, k):
- def cancel():
- # might have to try/catch unbind
- target.unbind(eventname, handler)
- def handler(event):
- p.advance(cancel)
- cancel()
- k((target, event), p)
- p.add_canceller(cancel)
- target.bind(eventname, handler)
- super().__init__(inner)
D arrowlets_test.py => arrowlets_test.py +0 -245
@@ 1,245 0,0 @@
-import unittest
-from arrowlets import *
-
-class Dummy:
- def __init__(self):
- self.event_bindings = {}
-
- def bind(self, event, f):
- if event in self.event_bindings:
- self.event_bindings[event].add(f)
- else:
- self.event_bindings[event] = {f}
-
- def unbind(self, event, f):
- if event not in self.event_bindings or f not in self.event_bindings[event]:
- return
- self.event_bindings[event].remove(f)
-
- def trigger(self, event):
- assert event in self.event_bindings, f'"{event}" event not found'
- # we construct a list to force a copy of this to be created
- # so each function gets triggered, but we may remove it in the
- # middle of iteration
- for f in list(self.event_bindings[event]):
- f(event)
-
-class TestDummy(unittest.TestCase):
- def test_dummy(self):
- d = Dummy()
- flag = [False]
- def f(_):
- flag[0] = True
- d.bind('event', f)
- d.trigger('event')
- self.assertTrue(flag[0])
- flag[0] = False
- d.unbind('event', f)
- d.trigger('event')
- self.assertFalse(flag[0])
-
-class ArrowletsTest(unittest.TestCase):
- def test_CpsA(self):
- add1 = lambda x: x + 1
- add2 = CpsA(add1).next(CpsA(add1))
- self.assertEqual(add2.run(1), 3)
-
- def test_SimpleEventA(self):
- nclicks = [0]
- def clickTargetA(x):
- target, event = x
- nclicks[0] += 1
- return target
-
- d = Dummy()
- SimpleEventA('click').next(clickTargetA).run(d)
- d.trigger('click')
- self.assertEqual(nclicks[0], 1)
-
- nclicks = [0]
- SimpleEventA('click').next(clickTargetA)\
- .next(SimpleEventA('click'))\
- .next(clickTargetA).run(d)
- d.trigger('click')
- d.trigger('click') # unbinds after two clicks
- d.trigger('click')
- self.assertEqual(nclicks[0], 2)
-
- def test_EventA(self):
- def clickTargetA(x):
- target, event = x
- nclicks[0] += 1
- return target
-
- target = Dummy()
-
- ###
- nclicks = [0]
-
- p = EventA('click')\
- .next(clickTargetA)\
- .next(EventA('click'))\
- .next(clickTargetA)\
- .run(target)
-
- target.trigger('click')
- target.trigger('click')
- self.assertEqual(nclicks[0], 2)
-
- ###
- nclicks = [0]
-
- p = EventA('click')\
- .next(clickTargetA)\
- .next(EventA('click'))\
- .next(clickTargetA)\
- .run(target)
-
- target.trigger('click') # nclicks++
- p.cancel() # cancel the chain
- self.assertEqual(nclicks[0], 1)
- target.trigger('click') # nclicks shouldn't increment here
- self.assertEqual(nclicks[0], 1)
-
- ###
- nclicks = [0]
-
- t = Dummy()
- t.text = 'nothing here'
-
- p = EventA('click')\
- .next(clickTargetA)\
- .next(EventA('click'))\
- .next(clickTargetA)\
- .run(t)
-
- def f(_):
- t.text = 'clicked!'
- p.next(f).run(None)
-
- self.assertEqual(t.text, 'nothing here')
- self.assertEqual(nclicks[0], 0)
- t.trigger('click')
- self.assertEqual(nclicks[0], 1)
- self.assertEqual(t.text, 'clicked!')
- t.trigger('click')
- self.assertEqual(nclicks[0], 2)
- self.assertEqual(t.text, 'clicked!')
-
- def test_ConstA(self):
- ConstA(1).next(lambda x: self.assertEqual(x, 1)).run(2)
-
- def test_repeat(self):
- def bubblesort(x):
- lst, i, j = x
- if j + 1 < i:
- if lst[j] > lst[j + 1]:
- lst[j], lst[j+1] = lst[j+1], lst[j]
- return Repeat((lst, i, j+1))
- elif i > 0:
- return Repeat((lst, i-1, 0))
- else:
- return Done(lst)
-
- bubblesortA = AsyncA(bubblesort).repeat()
- lst = [1,3,2,0,4]
- bubblesortA.run((lst, 4, 0))
- self.assertEqual(lst, [0,1,2,3,4])
-
- def test_product(self):
- target1 = Dummy()
- target2 = Dummy()
- def clickTargetsA(a):
- t1, t2 = a[0][0], a[1][0]
- t1.text = 'clicked a'
- t2.text = 'clicked b'
-
- EventA('click').product(EventA('click'))\
- .next(clickTargetsA)\
- .run((target1, target2))
-
- target1.trigger('click')
- target2.trigger('click')
- self.assertEqual(target1.text, 'clicked a')
- self.assertEqual(target2.text, 'clicked b')
-
- def test_or(self):
- def WriteA(s):
- def f(x):
- target, event = x
- target.text = s
- return target
- return f
-
- heads_elem = Dummy()
- tails_elem = Dummy()
- heads_elem.text = ''
- tails_elem.text = ''
- heads = ConstA(heads_elem)
- tails = ConstA(tails_elem)
- heads.next(EventA('click')).next(WriteA('I win!'))\
- .or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)
-
- heads_elem.trigger('click')
- self.assertEqual(heads_elem.text, 'I win!')
- self.assertEqual(tails_elem.text, '')
-
- ###
-
- heads_elem.text = ''
- tails_elem.text = ''
- heads.next(EventA('click')).next(WriteA('I win!'))\
- .or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)
-
- tails_elem.trigger('click')
- self.assertEqual(heads_elem.text, '')
- self.assertEqual(tails_elem.text, 'You lose!')
-
- def test_dragndrop(self):
- target = Dummy()
- proxy = Dummy()
- proxy.target = target
- proxy.status = 'dropped'
-
- def DragElementA(target):
- # hack because I don't care anymore...
- return ConstA(target).next(lambda _: proxy)
-
- def setupA(x):
- proxy, event = x
- proxy.status = 'setup'
- return proxy
-
- def dragA(x):
- proxy, event = x
- proxy.status = 'dragging'
- return proxy
-
- def dropA(x):
- proxy, event = x
- proxy.status = 'dropped'
- return proxy
-
- def cancelA(x):
- proxy, event = x
- proxy.status = 'cancelled'
- return proxy
-
- dragOrDropA = (
- (EventA('mousemove').next(dragA)).next(Repeat)\
- .or_((EventA('mouseup').next(dropA).next(Done)))).repeat()
-
- dragDropOrCancelA = (
- (EventA('mousemove').next(dragA).next(dragOrDropA))\
- .or_((EventA('mouseup').next(cancelA))))
-
- dragAndDropA = (
- (EventA('mousedown').next(setupA).next(dragDropOrCancelA)))
-
- DragElementA('dragtarget').next(dragAndDropA).run(None)
- proxy.trigger('mousedown')
- proxy.trigger('mousemove')
- self.assertEqual(proxy.status, 'dragging')
-
-if __name__ == '__main__':
- unittest.main()
D dataflow.py => dataflow.py +0 -70
@@ 1,70 0,0 @@
-from dataclasses import dataclass
-from typing import *
-import pandas as pd
-
-nodes = pd.DataFrame(columns=['id', 'value'])
-edges = pd.DataFrame(columns=['from', 'to'])
-
-class Id(int):
- pass
-
-def node(val):
- global nodes
- i = len(nodes)
- nodes = nodes.append({'id': i, 'value': val}, ignore_index=True)
- return Id(i)
-
-def edge(*args):
- global edges
- for a, b in zip(args, args[1:]):
- edges = edges.append({'from': a, 'to': b}, ignore_index=True)
-
-def render_graph():
- from graphviz import Digraph
- import os
- dot = Digraph()
-
- for _, r in nodes.iterrows():
- id_, n = r
- dot.node(str(id_), label=str(n))
-
- for _, r in edges.iterrows():
- from_, to = r
- dot.edge(str(from_), str(to))
-
- dot.render('test', format='png', view=True)
-
-fib = node('fib')
-
-def lift(x):
- if isinstance(x, Id):
- return x
- return node(x)
-
-def fby(first, rest):
- nfirst = lift(first)
- nrest = lift(rest)
- nself = node('fby')
- edge(nfirst, nself, nrest)
- return nself
-
-def add(a, b):
- na = lift(a)
- nb = lift(b)
- nself = node('add')
- edge(na, nself)
- edge(nb, nself)
- return nself
-
-def next_(n):
- nn = lift(n)
- nself = node('next')
- edge(nn, nself)
- return nself
-
-def where(expr, **bindings):
- edge(bindings[list(bindings.keys())[0]], expr) # FIXME
-
-where(fib, fib=fby(0, fby(1, add(fib, next_(fib)))))
-
-render_graph()
D glue.py => glue.py +0 -105
@@ 1,105 0,0 @@
-import tkinter as tk
-import tkinter.dnd as dnd # type: ignore
-from dataclasses import dataclass
-from typing import Dict, Tuple
-import table
-
-def _cursor_pos(canvas, event):
- # where the corner of the canvas is relative to the screen:
- x_org = canvas.winfo_rootx()
- y_org = canvas.winfo_rooty()
- # XXX UTTER INSANITY -- x/y_root are relative to the *MONITOR* x/y mouse positions!
- # That's why we're getting the offset here:
- return event.x_root - x_org, event.y_root - y_org
-
-class DraggedNode:
- def __init__(self, orig_node):
- x1, y1, x2, y2 = orig_node.canvas.bbox(orig_node.id)
- self.orig_node = orig_node
- self.id = orig_node.canvas.create_rectangle(x1, y1, x2, y2)
-
- def move(self, x, y):
- x1, y1, _1, _2 = self.orig_node.canvas.bbox(self.id)
- self.orig_node.canvas.move(self.id, x-x1, y-y1)
-
- def dnd_end(self, target, event):
- self.orig_node.canvas.delete(self.id)
- self.orig_node.move(*_cursor_pos(self.orig_node.canvas, event))
-
-class DraggedEdge:
- def __init__(self, start_node, x, y):
- self.ox, self.oy = x, y
- self.start_node = start_node
- self.id = start_node.canvas.create_line(x, y, x, y)
-
- def move(self, x, y):
- self.start_node.canvas.coords(self.id, self.ox, self.oy, x, y)
-
- def dnd_end(self, target, event):
- self.start_node.canvas.delete(self.id)
- if isinstance(target, Node): # FIXME
- self.start_node.add_child(target)
- print('add child', target)
-
-class Node:
- def __init__(self, canvas, x=10, y=10):
- self.frame = tk.Frame(canvas, borderwidth=2, bg='#aaa', width=100, height=100, pady=20, padx=10)
- self.id = canvas.create_window(x, y, window=self.frame, anchor='nw')
- self.canvas = canvas
-
- # FIXME I really don't like that it's possible to add a child without adding a node if this array is appended to instead of calling add_child().
- # Figure out a better way of enforcing this.
- self.children = []
- self.frame.bind('<Button-1>', lambda ev: dnd.dnd_start(DraggedNode(self), ev))
- self.frame.bind('<Button-3>', lambda ev: dnd.dnd_start(DraggedEdge(self, *_cursor_pos(self.canvas, ev)), ev))
-
- def add_child(self, child):
- self.children.append(child)
- # TODO self.canvas.create_line()
-
- def move(self, x, y):
- self.canvas.coords(self.id, x, y)
-
- def dnd_accept(self, source, event):
- return self
-
-class NodeCanvas:
- def __init__(self, root):
- self.canvas = tk.Canvas(root, width=800, height=600, background='#444')
- self.canvas.pack(fill='both', expand=1)
- self.canvas.dnd_accept = self.dnd_accept
-
- def dnd_accept(self, source, event):
- pass
- #return self
-
- def dnd_motion(self, source, event):
- x, y = _cursor_pos(self.canvas, event)
- source.move(x, y)
-
- ### unused ###
- def dnd_enter(self, source, event):
- pass
-
- ### unused ###
- def dnd_leave(self, source, event):
- pass
-
- ### unused ###
- def dnd_commit(self, source, event):
- pass
-
-import numpy as np # type: ignore
-# TODO: make initialization notation easier...
-nodes = table.Table({'id': np.array([], dtype='int'), 'gui_handle': np.array([], dtype='object')})
-edges = table.Table({'n1_id': np.array([], dtype='int'), 'n2_id': np.array([], dtype='int')})
-
-root = tk.Tk()
-root.geometry("+1+1")
-canvas = NodeCanvas(root)
-n = Node(canvas.canvas).frame
-tk.Label(n, text='test').pack()
-n2 = Node(canvas.canvas).frame
-tk.Label(n2, text='test2').pack()
-root.mainloop()
-
D table.py => table.py +0 -107
@@ 1,107 0,0 @@
-from dataclasses import dataclass
-from typing import List, Dict, Tuple, Iterator, Optional, Union
-import numpy as np # type: ignore
-from collections.abc import Iterable
-
-# TODO: implement pure relational alebra version
-# TODO: implement tuple calculus version
-# TODO: add referential integrity...
-# TODO: add serialization/deserialization
-
-def _ensure_columns_match(a: List, b: List) -> None:
- if set(a) != set(b):
- raise ValueError('columns do not match', a, b)
-
-class Table:
- columns: List[str]
- def __init__(self, x: Union[List, Tuple, Dict], columns: Optional[List[str]] = None): # TODO: refactor this spaghetti
- '''
- pre: self.columns is not None or len(x) > 0
- post: self.columns is not None and len(self.columns) > 0
- '''
- if isinstance(x, list) or isinstance(x, tuple):
- if columns is not None:
- if len(columns) != len(x[0]):
- raise ValueError('Length of columns does not match length of data')
- self.columns = columns
- else:
- # we know x is nonempty, so mypy can ignore the type here
- self.columns = list(map(str, range(len(x[0]))))
- self.column_data = [np.array(xs) for xs in zip(*x)]
- elif isinstance(x, dict):
- if columns is not None:
- _ensure_columns_match(list(x.keys()), columns)
- self.columns = columns
- else:
- self.columns = list(x.keys())
- self.column_data = [np.array(x[k]) for k in self.columns]
- else:
- assert False, type(x)
-
- assert(self.columns)
- assert(self.column_data)
-
- def __repr__(self):
- return f'<Table {self.columns}>'
-
- def coli(self, name: str):
- try:
- return self.columns.index(name)
- except ValueError:
- raise AttributeError('No such column', name)
-
- def __len__(self):
- return len(self.column_data[0])
-
- def tuples(self) -> Iterator[Tuple]:
- for t in zip(*self.column_data):
- yield t
-
- def tuple(self):
- assert(self.column_data[0].shape == ())
- return tuple(self[c].item() for c in self.columns)
-
- def __getitem__(self, key):
- if isinstance(key, int):
- return self.column_data[key]
- if isinstance(key, str):
- return self.column_data[self.coli(key)]
-
- if isinstance(key, list):
- return Table({key: self[key] for key in columns})
- if isinstance(key, slice):
- return Table({key: self[key][key] for key in columns})
- if isinstance(key, tuple):
- colkey, rowkey = key
- if isinstance(colkey, int) or isinstance(colkey, str):
- return self[colkey][rowkey]
- elif isinstance(colkey, list):
- r = Table({col: self.column_data[self.coli(col)][rowkey] for col in colkey})
- if isinstance(rowkey, int):
- return r.tuple()
- return r
- elif isinstance(colkey, slice):
- return self[self.columns[colkey], rowkey]
-
- def vstack(self, other: 'Table'):
- '''
- post: len(__return__) == len(self) + len(other)
- '''
- _ensure_columns_match(self.columns, other.columns)
- return Table({c: np.hstack((self[c], other[c])) for c in self.columns}, columns=self.columns)
-
- def append(self, vals: dict):
- '''
- post: len(__return__) == len(self) + 1
- '''
- if isinstance(vals, dict):
- _ensure_columns_match(list(vals.keys()), self.columns)
- return self.vstack(Table({k: [v] for k, v in vals.items()}))
- elif isinstance(vals, tuple):
- if len(vals) != len(self.columns): # TODO: maybe typecheck here?
- raise ValueError("Not enough tuple values to map into columns")
- return self.vstack(Table([vals], columns=self.columns))
- else:
- assert False, type(vals)
-
-
D table_test.py => table_test.py +0 -61
@@ 1,61 0,0 @@
-from table import *
-import unittest
-
-class TestTable(unittest.TestCase):
- def test_initialize(self):
- t = Table([[1,2,'a'], [4,5,'b']], columns=['x', 'y', 'z'])
- self.assertEqual(list(t['x']), [1,4])
- self.assertEqual(list(t['z']), ['a','b'])
-
- t = Table({'x': [1,2,3], 'y': ['a', 'b', 'c'], 'z': [1, 1, 1]}, columns=['x', 'y', 'z'])
- self.assertEqual(list(t['x']), list(t[0]))
- self.assertEqual(list(t['x']), [1,2,3])
-
- t = Table([['a', 1, 1], ['b',2,1]])
- self.assertEqual(list(t[0]), ['a', 'b'])
-
- def test_coli(self):
- t = Table({'a': [1, 2, 3]})
- self.assertEqual(t.coli('a'), 0)
- with self.assertRaises(AttributeError) as ctx:
- t.coli('b')
-
- def test_subscript(self):
- t = Table({'a': np.array([1,2,3]),
- 'b': np.array(['a', 'b', 'c'])},
- columns=['a', 'b'])
-
- self.assertEqual(list(t[0]), list(t['a']))
-
- self.assertEqual(list(t[0]), [1,2,3])
- self.assertEqual(list(t['a']), [1,2,3])
- self.assertEqual(list(t['a', :2]), [1,2])
- self.assertEqual(list(t[0, :2]), [1,2])
- self.assertEqual(list(t[['a', 'b'], :2]['a']), [1,2])
- t2 = t[:, t['a'] == 2]
- self.assertEqual(set(t2.columns), set(t.columns))
-
- self.assertEqual(t[:, 1], (2, 'b'))
-
- def test_vstack(self):
- t1 = Table({'a': [1,2,3], 'b': [2,3,4], 'c': [1,2,3]})
- t2 = Table({'a': [2], 'b': [2], 'c': [2]})
-
- self.assertEqual(t1.vstack(t2).columns, ['a', 'b', 'c'])
- self.assertEqual(list(t1.vstack(t2)['a']), [1,2,3,2])
- self.assertEqual(list(t1.vstack(t2)['b']), [2,3,4,2])
- self.assertEqual(list(t1.vstack(t2)['c']), [1,2,3,2])
-
- def test_tuples(self):
- t1 = Table({'a': [1,2,3], 'b': ['x', 'y', 'z']})
- self.assertEqual(list(t1.tuples()), [(1, 'x'), (2, 'y'), (3, 'z')])
-
- def test_append(self):
- t1 = Table({'a': [1,2], 'b': ['x', 'y']})
- t2 = t1.append({'a': 4, 'b': 'q'})
- self.assertEqual(list(t2.tuples()), [(1, 'x'), (2, 'y'), (4, 'q')])
- t2 = t1.append((4, 'q'))
- self.assertEqual(list(t2.tuples()), [(1, 'x'), (2, 'y'), (4, 'q')])
-
-if __name__ == '__main__':
- unittest.main()
D test_dataflow.py => test_dataflow.py +0 -199
@@ 1,199 0,0 @@
-import unittest
-from dataflow import *
-import operator
-
-class TestDataflow2(unittest.TestCase):
- def test_stream(self):
- @stream
- def test_f():
- yield 1
- yield 2
- yield 3
-
- it = test_f()
-
- self.assertEqual(next(it), 1)
- self.assertEqual(next(it), 2)
- self.assertEqual(next(it), 3)
-
- def test_int(self):
- it1 = int_(1)
- self.assertEqual(next(it1), 1)
- self.assertEqual(next(it1), 1)
-
- it2 = int_(2)
- self.assertEqual(next(it2), 2)
- self.assertEqual(next(it1), 1)
-
- def test_fby(self):
- s = fby(1, 2)
- self.assertEqual(next(s), 1)
- self.assertEqual(next(s), 2)
- self.assertEqual(next(s), 2)
-
- def test_fby2(self):
- s = fby(int_(1), int_(2))
- self.assertEqual(next(s), 1)
- self.assertEqual(next(s), 2)
- self.assertEqual(next(s), 2)
-
- def test_fby3(self):
- s = fby(int_(1), fby(int_(2) + int_(2), int_(3)))
- self.assertEqual(next(s), 1)
- self.assertEqual(next(s), 4)
- self.assertEqual(next(s), 3)
- self.assertEqual(next(s), 3)
-
- def test_fby4_with_duplicate_streams(self):
- s1 = fby(int_(1), fby(int_(2), fby(int_(3), int_(4))))
- self.assertEqual(next(s1), 1)
- s2 = s1.copy()
- self.assertEqual(next(s1), 2)
- self.assertEqual(next(s1), 3)
- self.assertEqual(next(s2), 2)
- self.assertEqual(next(s1), 4)
- self.assertEqual(next(s2), 3)
- self.assertEqual(next(s2), 4)
-
- def test_var1(self):
- self.assertEqual(
- next(Var('x', {}).where(x = int_(1))), 1)
- p = Var('x', {}).where(x = fby(1, 2))
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 2)
-
- def test_var2(self):
- n = Var('n', {})
- p = n.where(n = fby(int_(1), n + int_(1)))
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 3)
- self.assertEqual(next(p), 4)
-
- def test_var3(self):
- env = {}
- a = Var('a', env)
- b = Var('b', env)
- p = b.where(a = fby(int_(1), a + int_(1)),
- b = a)
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 3)
- self.assertEqual(next(p), 4)
-
- def test_var3(self):
- n = Var('n', {})
- p = n.where(n = fby(int_(1), n + n))
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 4)
- self.assertEqual(next(p), 8)
-
- def test_fac(self):
- env = {}
- fac = Var('fac', env)
- n = Var('n', env)
- p = fac.where(n = fby(int_(0), n + int_(1)),
- fac = fby(int_(1), fac * (n + int_(1))))
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 6)
- self.assertEqual(next(p), 24)
- self.assertEqual(next(p), 120)
- self.assertEqual(next(p), 720)
-
- def test_next_(self):
- @stream
- def f():
- yield 1
- yield 2
- yield 3
- yield 4
-
- s = next_(f())
-
- self.assertEqual(next(s), 2)
- self.assertEqual(next(s), 3)
- self.assertEqual(next(s), 4)
-
- def test_next_1(self):
- p = next_(fby(1, fby(2, fby(3, 4))))
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 3)
- self.assertEqual(next(p), 4)
-
- def test_next_2(self):
- env = {}
- n = Var('n', env)
- m = Var('m', env)
- p = m.where(n = fby(1, n + int_(1)),
- m = next_(next_(n)))
-
- self.assertEqual(next(p), 3)
- self.assertEqual(next(p), 4)
- self.assertEqual(next(p), 5)
-
- def test_fib(self):
- fib = Var('fib', {})
- p = fib.where(fib = fby(0, fby(1, fib + next_(fib))))
- self.assertEqual(next(p), 0)
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 1)
- self.assertEqual(next(p), 2)
- self.assertEqual(next(p), 3)
-
-class TestDataflow:#(unittest.TestCase):
- def test_fby1(self):
- p = DataflowProgram()
- n = p.var('n')
- p(n).where(n = Int(0).fby(Int(1)))
- self.assertEqual(p.step(), 0)
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 1)
-
- def test_fby2(self):
- p = DataflowProgram()
- n = p.var('n')
- p(n).where(n = Int(0).fby(Int(1).fby(Int(2))))
- self.assertEqual(p.step(), 0)
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 2)
-
- def test_recursion(self):
- p = DataflowProgram()
- n = p.var('n')
- p(n).where(n = Int(0).fby(n + Int(1)))
- self.assertEqual(p.step(), 0)
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 2)
- self.assertEqual(p.step(), 3)
- self.assertEqual(p.step(), 4)
-
- def test_fac(self):
- p = DataflowProgram()
- fac = p.var('fac')
- n = p.var('n')
- p(fac).where(
- n = Int(0).fby(n + Int(1)),
- fac = Int(1).fby(fac * (n + Int(1))))
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 2)
- self.assertEqual(p.step(), 6)
- self.assertEqual(p.step(), 24)
- self.assertEqual(p.step(), 120)
- self.assertEqual(p.step(), 720)
-
- def test_fib(self): # FIXME
- p = DataflowProgram()
- fib = p.var('fib')
- p(fib).where(fib = Int(0).fby(Int(1).fby(fib + fib.next())))
-
- self.assertEqual(p.step(), 0)
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 1)
- self.assertEqual(p.step(), 2)
- self.assertEqual(p.step(), 3)
-
-if __name__ == '__main__':
- unittest.main()