~nch/glue

06e401b9efe872f4e12e307284c9bd9c0843a177 — nc 1 year, 4 months ago 0442bda
implemented rest of arrowlets
2 files changed, 318 insertions(+), 6 deletions(-)

M arrowlets.py
M arrowlets_test.py
M arrowlets.py => arrowlets.py +131 -5
@@ 1,12 1,14 @@
from typing import Callable, TypeVar, Any
from typing_extensions import Protocol
from typing import Callable, TypeVar, Any, Generic
from dataclasses import dataclass

class _CpsA:
A = TypeVar('A')

class _CpsA: # (x, k) -> ()
    @staticmethod
    def lift(f: Callable) -> '_CpsA':
    def lift(f) -> '_CpsA':
        return _CpsA(lambda x, k: k(f(x)))

    def __init__(self, cps: Callable[[Any, Callable], '_CpsA']):
    def __init__(self, cps: Callable[[Any, Callable], None]):
        self.cps = cps

    def next(self, g):


@@ 33,8 35,132 @@ class SimpleEventA(_CpsA):
        self.eventname = eventname
        def _f(target, k):
            def handler(event):
                # might have to try/catch unbind
                target.unbind(eventname, handler)
                k((target, event))
            target.bind(eventname, handler)
        super().__init__(_f)

def AsyncA(f):
    if isinstance(f, _AsyncA):
        return f
    if callable(f): # function
        return _AsyncA.lift(f)
    assert False, f

@dataclass
class Repeat:
    value: Any

@dataclass
class Done:
    value: Any

class _AsyncA: # (x, p, k) -> ()
    def __init__(self, cps):
        self.acps = cps

    def next(self, g):
        g = AsyncA(g)
        def inner(x, p, k):
            self.acps(x, p, lambda y, q: g.acps(y, q, k))
        return _AsyncA(inner)

    def run(self, x, p=None) -> 'ProgressA':
        p = p if p else ProgressA()
        self.acps(x, p, lambda y, p: y)
        return p

    def repeat(self):
        def repeatf(x, p, k):
            def inner_repeatf(y, q):
                if isinstance(y, Repeat):
                    repeatf(y.value, q, k)
                elif isinstance(y, Done):
                    k(y.value, q)
                else:
                    raise TypeError("Unknown type to repeat on", type(y))
            self.acps(x, p, inner_repeatf)
        return _AsyncA(repeatf)

    def product(self, g):
        g = AsyncA(g)
        def productf(x, p, k):
            c = [2]
            out = [None, None]
            def set_out(i, y):
                out[i] = y
                # barrier
                c[0] -= 1
                if c[0] == 0:
                    k((out[0], out[1]), p)
            self.next(lambda y1: set_out(0, y1)).run(x[0], p)
            g.next(lambda y2: set_out(1, y2)).run(x[1], p)
        return _AsyncA(productf)

    def or_(self, g):
        g = AsyncA(g)
        def or_f(x, p, k):
            prog = [ProgressA(), ProgressA()]
            def end(i):
                prog[i].cancel()
                prog[i] = None
            prog[0].next(lambda _: end(1)).run(None)
            prog[1].next(lambda _: end(0)).run(None)

            def cancel():
                if prog[0]: prog[0].cancel()
                if prog[1]: prog[1].cancel()

            def join(y, q):
                p.advance(cancel)
                k(y, q)

            p.add_canceller(cancel)
            self.acps(x, prog[0], join)
            g.acps(x, prog[1], join)
        return _AsyncA(or_f)

    @staticmethod
    def lift(f: Callable[[Any], Any]):
        return _AsyncA(lambda x, p, k: k(f(x), p))

def ConstA(x):
    return AsyncA(lambda _: x)

class ProgressA(_AsyncA):
    def __init__(self):
        self.cancellers = []
        self.observers = []
        super().__init__(lambda x, p, k: self.observers.append(lambda y: k(y, p)))

    def add_canceller(self, canceller):
        self.cancellers.append(canceller)

    def advance(self, canceller):
        try:
            i = self.cancellers.index(canceller)
            del self.cancellers[i]
        except:
            pass
        while self.observers:
            self.observers.pop()(None)

    def cancel(self):
        while self.cancellers:
            self.cancellers.pop()()

class EventA(_AsyncA):
    def __init__(self, eventname):
        self.eventname = eventname
        def inner(target, p, k):
            def cancel():
                # might have to try/catch unbind
                target.unbind(eventname, handler)
            def handler(event):
                p.advance(cancel)
                cancel()
                k((target, event), p)
            p.add_canceller(cancel)
            target.bind(eventname, handler)
        super().__init__(inner)

M arrowlets_test.py => arrowlets_test.py +187 -1
@@ 12,10 12,12 @@ class Dummy:
            self.event_bindings[event] = {f}

    def unbind(self, event, f):
        if event not in self.event_bindings or f not in self.event_bindings[event]:
            return
        self.event_bindings[event].remove(f)

    def trigger(self, event):
        assert event in self.event_bindings, f'"{event}" not found'
        assert event in self.event_bindings, f'"{event}" event not found'
        # we construct a list to force a copy of this to be created
        # so each function gets triggered, but we may remove it in the
        # middle of iteration


@@ 54,6 56,190 @@ class ArrowletsTest(unittest.TestCase):
        d.trigger('click')
        self.assertEqual(nclicks[0], 1)

        nclicks = [0]
        SimpleEventA('click').next(clickTargetA)\
                .next(SimpleEventA('click'))\
                .next(clickTargetA).run(d)
        d.trigger('click')
        d.trigger('click') # unbinds after two clicks
        d.trigger('click')
        self.assertEqual(nclicks[0], 2)

    def test_EventA(self):
        def clickTargetA(x):
            target, event = x
            nclicks[0] += 1
            return target

        target = Dummy()

        ###
        nclicks = [0]

        p = EventA('click')\
                .next(clickTargetA)\
                .next(EventA('click'))\
                .next(clickTargetA)\
                .run(target)

        target.trigger('click')
        target.trigger('click')
        self.assertEqual(nclicks[0], 2)

        ###
        nclicks = [0]

        p = EventA('click')\
                .next(clickTargetA)\
                .next(EventA('click'))\
                .next(clickTargetA)\
                .run(target)

        target.trigger('click') # nclicks++
        p.cancel() # cancel the chain
        self.assertEqual(nclicks[0], 1)
        target.trigger('click') # nclicks shouldn't increment here
        self.assertEqual(nclicks[0], 1)

        ###
        nclicks = [0]

        t = Dummy()
        t.text = 'nothing here'

        p = EventA('click')\
                .next(clickTargetA)\
                .next(EventA('click'))\
                .next(clickTargetA)\
                .run(t)

        def f(_):
            t.text = 'clicked!'
        p.next(f).run(None)

        self.assertEqual(t.text, 'nothing here')
        self.assertEqual(nclicks[0], 0)
        t.trigger('click')
        self.assertEqual(nclicks[0], 1)
        self.assertEqual(t.text, 'clicked!')
        t.trigger('click')
        self.assertEqual(nclicks[0], 2)
        self.assertEqual(t.text, 'clicked!')

    def test_ConstA(self):
        ConstA(1).next(lambda x: self.assertEqual(x, 1)).run(2)

    def test_repeat(self):
        def bubblesort(x):
            lst, i, j = x
            if j + 1 < i:
                if lst[j] > lst[j + 1]:
                    lst[j], lst[j+1] = lst[j+1], lst[j]
                return Repeat((lst, i, j+1))
            elif i > 0:
                return Repeat((lst, i-1, 0))
            else:
                return Done(lst)

        bubblesortA = AsyncA(bubblesort).repeat()
        lst = [1,3,2,0,4]
        bubblesortA.run((lst, 4, 0))
        self.assertEqual(lst, [0,1,2,3,4])

    def test_product(self):
        target1 = Dummy()
        target2 = Dummy()
        def clickTargetsA(a):
            t1, t2 = a[0][0], a[1][0]
            t1.text = 'clicked a'
            t2.text = 'clicked b'

        EventA('click').product(EventA('click'))\
                .next(clickTargetsA)\
                .run((target1, target2))

        target1.trigger('click')
        target2.trigger('click')
        self.assertEqual(target1.text, 'clicked a')
        self.assertEqual(target2.text, 'clicked b')

    def test_or(self):
        def WriteA(s):
            def f(x):
                target, event = x
                target.text = s
                return target
            return f

        heads_elem = Dummy()
        tails_elem = Dummy()
        heads_elem.text = ''
        tails_elem.text = ''
        heads = ConstA(heads_elem)
        tails = ConstA(tails_elem)
        heads.next(EventA('click')).next(WriteA('I win!'))\
            .or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)

        heads_elem.trigger('click')
        self.assertEqual(heads_elem.text, 'I win!')
        self.assertEqual(tails_elem.text, '')

        ###

        heads_elem.text = ''
        tails_elem.text = ''
        heads.next(EventA('click')).next(WriteA('I win!'))\
            .or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)

        tails_elem.trigger('click')
        self.assertEqual(heads_elem.text, '')
        self.assertEqual(tails_elem.text, 'You lose!')

    def test_dragndrop(self):
        target = Dummy()
        proxy = Dummy()
        proxy.target = target
        proxy.status = 'dropped'

        def DragElementA(target):
            # hack because I don't care anymore...
            return ConstA(target).next(lambda _: proxy)

        def setupA(x):
            proxy, event = x
            proxy.status = 'setup'
            return proxy

        def dragA(x):
            proxy, event = x
            proxy.status = 'dragging'
            return proxy

        def dropA(x):
            proxy, event = x
            proxy.status = 'dropped'
            return proxy

        def cancelA(x):
            proxy, event = x
            proxy.status = 'cancelled'
            return proxy

        dragOrDropA = (
             (EventA('mousemove').next(dragA)).next(Repeat)\
        .or_((EventA('mouseup').next(dropA).next(Done)))).repeat()

        dragDropOrCancelA = (
             (EventA('mousemove').next(dragA).next(dragOrDropA))\
        .or_((EventA('mouseup').next(cancelA))))

        dragAndDropA = (
             (EventA('mousedown').next(setupA).next(dragDropOrCancelA)))

        DragElementA('dragtarget').next(dragAndDropA).run(None)
        proxy.trigger('mousedown')
        proxy.trigger('mousemove')
        self.assertEqual(proxy.status, 'dragging')

if __name__ == '__main__':
    unittest.main()