export Processor
export newslot, getslot, setslot!
global const SLOTS = Dict{UUID, Observable}()
function newslot(::Type{T}=Any, init=nothing) where T
uuid = uuid4()
obs = Observable{T}(init)
SLOTS[uuid] = obs
return uuid, obs
end
getslot(uuid::UUID) = SLOTS[uuid][]
setslot!(uuid::UUID, x) = (SLOTS[uuid][] = x;)
mutable struct Processor
func
in_map::Dict{String,Union{UUID,Nothing}}
out_map::Dict{String,UUID}
update_cond::Threads.Condition
end
function Processor(func, in_map::Dict{String,Union{UUID,Nothing}}, out_slots::Vector{String})
out_map = Dict{String,UUID}()
for name in out_slots
uuid, _ = newslot()
out_map[name] = uuid
end
return Processor(func, in_map, out_map, Threads.Condition())
end
Processor(func, in_map::Vector{String}, out_slots::Vector{String}) =
Processor(func, Dict{String,Union{UUID,Nothing}}(map(x->x=>nothing, in_map)), out_slots)
Processor(func, in_map::Nothing, out_slots) =
Processor(func, Dict{String,Union{UUID,Nothing}}(), out_slots)
Processor(func, in_map, out_slots::Nothing) =
Processor(func, in_map, String[])
Processor(func, in_map::Nothing, out_slots::Nothing) =
Processor(func, Dict{String,Union{UUID,Nothing}}(), String[])
Processor() = Processor(p -> (), Dict{String,Union{UUID,Nothing}}(), Dict{String,UUID}(), Threads.Condition())
(p::Processor)() = allinputsmapped(p) && p.func(p)
getslot(p::Processor, name::String) = getslot(p.in_map[name])
setslot!(p::Processor, name::String, x) = setslot!(p.out_map[name], x)
# FIXME: Retain original slot ordering
inputslots(p::Processor) = p.in_map
inputnames(p::Processor) = sort(collect(keys(p.in_map)))
outputslots(p::Processor) = p.out_map
outputnames(p::Processor) = sort(collect(keys(p.out_map)))
function attach!(pdest::Processor, in_name::String, psrc::Processor, out_name::String)
pdest.in_map[in_name] = psrc.out_map[out_name]
end
detach!(proc::Processor, name::String) = (proc.in_map[name] = nothing;)
allinputsmapped(p::Processor) = all(x->x!==nothing, values(p.in_map))
global const PROCESSORS = Dict{UUID, Processor}()
global const PROCESSOR_TASKS = Dict{UUID, Task}()
function register_processor!(p::Processor)
@assert allinputsmapped(p) "Processor is not fully input mapped"
uuid = uuid4()
PROCESSORS[uuid] = p
tsk = Task(()->processor_worker(p))
schedule(tsk)
PROCESSOR_TASKS[uuid] = tsk
return uuid, tsk
end
function processor_worker(p::Processor, name::String, kind::Symbol)
schedule(@task begin
while true
select([(:take, SLOTS[inp]) for inp in values(p.in_map)])
p()
end
end)
end
### Demo ###
#=
A1 = Processor(nothing, ["output"]) do p
setslot!(p, "output", rand())
end
A2 = Processor(nothing, ["output"]) do p
setslot!(p, "output", rand())
end
B = Processor(["slot_a","slot_b"], ["slot_c"]) do p
a = getslot(p, "slot_a")
b = getslot(p, "slot_b")
c = a + b
setslot!(p, "slot_c", c)
end
C1 = Processor(["input"], nothing) do p
println("I do say, ", getslot(p, "input"), ", sir!")
end
C2 = Processor(["input"], nothing) do p
println("I do say, ", getslot(p, "input"), ", sir!")
end
C = Processor(["input"], nothing) do p
println("I do say, ", getslot(p, "input"), ", sir!")
end
attach!(B, "slot_a", A1, "output")
attach!(B, "slot_b", A2, "output")
attach!(C, "input", B, "slot_c")
A1()
A2()
B()
C()
attach!(C1, "input", A1, "output")
attach!(C2, "input", A2, "output")
C1()
C2()
=#