fix a link in the shootout toc
it's a new year
commit updated changelog
good programs should be made out of processes and queues.
-- Rich Hickey introducing core.async
This is a library for coordinating actions between threads of control. It provides similar functionality to parts of core.async and manifold. One way to look at it would be as a channels only version of core.async, without the go macro.
The machinate programming model is inspired by The Concurrent ML Reference Manual and a new concurrent ml.
com.manigfeald/machinate {:mvn/version "0.0.119"}
or
com.manigfeald/machinate {:git/url "https://git.sr.ht/~hiredman/machinate"
:git/sha "0a202e6eeca791b8a81e6f34bed1c362d3df3b7c"}
(require '[com.manigfeald.machinate :as m])
(def c (m/channel))
(m/sync! (m/wrap (m/receive c) println))
(m/sync!! (m/send c "Hello World"))
A version of dining philosophers. Uses virtual threads.
clojure -X:philosophers
A take on the SWIM group membership protocol. Uses the core.async interop.
clojure -X:swim
A power of two random choices load balancing example. Uses manifold's let-flow.
clojure -X:twochoices
The code for the runnable examples is browsable here.
I like Communicating Sequential Processes and the π-calculus a lot. I have used core.async a fair bit. I think some of the CML features (particularly the nack event combinator) are very powerful for building systems. I wish core.async had those features. core.async also has resource usage issues, and I cannot seem to get my patches that fix those merged.
This code eventually causes an out of memory error. It prints out the size of the ever increasing queue of timers.
(require '[clojure.core.async :as a])
(require '[clojure.core.async.impl.timers :as t])
(while true
(when (zero? (rand-int 100))
(prn (count @#'t/timeouts-queue)))
(a/timeout Integer/MAX_VALUE)
(Thread/sleep 100))
This happens because calling timeout immediately puts a timer in a queue, so it cannot be gc'ed until the timer completes. Machinate instead only notes the duration and creation time of timeouts and only puts them in the queue to be processed if they are syncrhonized on. Additionally machinate removes timers from the queue if no one is actively attempting to synchronize on them.
This code throws because alt over the defers returned from the take returns only one of the values, but a value is consumed from each stream.
(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(let [s1 (s/stream)
s2 (s/stream)]
(future
(try
(doseq [i (range 5)]
(s/put! s1 [1 i])
(s/put! s2 [2 i]))
(s/close! s1)
(s/close! s2)
(catch Throwable t
(prn t))))
(assert (= 0 (second @(d/alt (s/take! s1)
(s/take! s2)))))
(assert (not= (count (s/stream->seq s1))
(count (s/stream->seq s2)))))
Manifold s/take!
from a stream here can be seen as something like
(future (.take some-queue))
, with d/alt
then reutrning whichever
completes first, but both of them run and complete. The rough
equivalent in machinate is sort of like transactional mutual exclusion
of delays. m/receive
returns something like (delay (.take some-queue))
and m/choice
can be thought of as forcing each delay,
returning the first that completes, and rolling back the other(as a
model, rollback is not the actual mechanism used).
I have started a set of benchmarks for tracking machinates performance: Shootout 0001
I am @hiredman and there is a #machinate channel.
@hiredman@downey.family
You can give me money to encourage me to work on projects like this here. Or even better: I am looking for work, so you could hire me to work on whatever you want me to.
Copyright © 2024-2025 Kevin Downey
Distributed under the Eclipse Public License version 1.0.