~jpsamaroo/Adjutant.jl

ref: 8a8229fe1ed910ea963b69840af6fd91fcdb76c9 Adjutant.jl/src/processing.jl -rw-r--r-- 3.4 KiB
8a8229feJulian P Samaroo Allow loading user's project 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
global const SLOTS = Dict{UUID, Observable}()

function newslot(::Type{T}=Any, init=nothing) where T
    uuid = uuid4()
    obs = Observable{T}(init)
    SLOTS[uuid] = obs
    return uuid, obs
end
getslot(uuid::UUID) = SLOTS[uuid][]
setslot!(uuid::UUID, x) = (SLOTS[uuid][] = x;)

mutable struct Processor
    func
    in_map::Dict{String,Union{UUID,Nothing}}
    out_map::Dict{String,UUID}
    update_cond::Threads.Condition
end
function Processor(func, in_map::Dict{String,Union{UUID,Nothing}}, out_slots::Vector{String})
    out_map = Dict{String,UUID}()
    for name in out_slots
        uuid, _ = newslot()
        out_map[name] = uuid
    end
    return Processor(func, in_map, out_map, Threads.Condition())
end
Processor(func, in_map::Vector{String}, out_slots::Vector{String}) =
    Processor(func, Dict{String,Union{UUID,Nothing}}(map(x->x=>nothing, in_map)), out_slots)
Processor(func, in_map::Nothing, out_slots) =
    Processor(func, Dict{String,Union{UUID,Nothing}}(), out_slots)
Processor(func, in_map, out_slots::Nothing) =
    Processor(func, in_map, String[])
Processor(func, in_map::Nothing, out_slots::Nothing) =
    Processor(func, Dict{String,Union{UUID,Nothing}}(), String[])
Processor() = Processor(p -> (), Dict{String,Union{UUID,Nothing}}(), Dict{String,UUID}(), Threads.Condition())

(p::Processor)() = allinputsmapped(p) && p.func(p)
getslot(p::Processor, name::String) = getslot(p.in_map[name])
setslot!(p::Processor, name::String, x) = setslot!(p.out_map[name], x)
# FIXME: Retain original slot ordering
inputslots(p::Processor) = p.in_map
inputnames(p::Processor) = sort(collect(keys(p.in_map)))
outputslots(p::Processor) = p.out_map
outputnames(p::Processor) = sort(collect(keys(p.out_map)))
function attach!(pdest::Processor, in_name::String, psrc::Processor, out_name::String)
    pdest.in_map[in_name] = psrc.out_map[out_name]
end
detach!(proc::Processor, name::String) = (proc.in_map[name] = nothing;)
allinputsmapped(p::Processor) = all(x->x!==nothing, values(p.in_map))

global const PROCESSORS = Dict{UUID, Processor}()
global const PROCESSOR_TASKS = Dict{UUID, Task}()

function register_processor!(p::Processor)
    @assert allinputsmapped(p) "Processor is not fully input mapped"
    uuid = uuid4()
    PROCESSORS[uuid] = p
    tsk = Task(()->processor_worker(p))
    schedule(tsk)
    PROCESSOR_TASKS[uuid] = tsk
    return uuid, tsk
end

function processor_worker(p::Processor, name::String, kind::Symbol)
    schedule(@task begin
        while true
            select([(:take, SLOTS[inp]) for inp in values(p.in_map)])
            p()
        end
    end)
end

### Demo ###

#=
A1 = Processor(nothing, ["output"]) do p
    setslot!(p, "output", rand())
end
A2 = Processor(nothing, ["output"]) do p
    setslot!(p, "output", rand())
end
B = Processor(["slot_a","slot_b"], ["slot_c"]) do p
    a = getslot(p, "slot_a")
    b = getslot(p, "slot_b")
    c = a + b
    setslot!(p, "slot_c", c)
end
C1 = Processor(["input"], nothing) do p
    println("I do say, ", getslot(p, "input"), ", sir!")
end
C2 = Processor(["input"], nothing) do p
    println("I do say, ", getslot(p, "input"), ", sir!")
end
C = Processor(["input"], nothing) do p
    println("I do say, ", getslot(p, "input"), ", sir!")
end
attach!(B, "slot_a", A1, "output")
attach!(B, "slot_b", A2, "output")
attach!(C, "input", B, "slot_c")
A1()
A2()
B()
C()
attach!(C1, "input", A1, "output")
attach!(C2, "input", A2, "output")
C1()
C2()
=#