M src/threads.janet => src/threads.janet +6 -9
@@ 83,17 83,14 @@
[jobs thread-init]
(print "Beginning " (length jobs) " jobs...")
-
- (let [workers (seq [job :in jobs] (thread-init job))
- res @{}]
- (var got-back 0)
-
- (while (< got-back (length jobs))
+
+ (map thread-init jobs)
+
+ (let [res @{}]
+ (while (< (length res) (length jobs))
(match (thread/receive timeout)
[:res k v]
- (do
- (set got-back (inc got-back))
- (put res k v))))
+ (put res k v)))
(print "Finished jobs.")
res))
A test/threads.janet => test/threads.janet +31 -0
@@ 0,0 1,31 @@
+(import testament :prefix "" :exit true)
+(import src/threads)
+
+(defn thread-init [f]
+ (thread/new (fn [parent] (f parent))))
+
+(defn dist-job [i]
+ (fn [parent] (:send parent [:res i true])))
+
+(deftest distribute
+ (let [jobs (seq [x :range [0 10]] (dist-job x))
+ out (threads/distribute jobs thread-init)]
+ (is (== @{0 true 2 true 1 true 3 true 5 true 6 true 4 true 8 true 7 true 9 true} out))))
+
+(defn pool-init [] ())
+(defn do-work
+ [parent self msg _state]
+ (match msg
+ :die (do (:send parent self)
+ (thread/exit))
+ [:consume n] (spit (string "test/support/pool/" n) "ok")))
+
+(deftest pool
+ (os/shell "rm test/support/pool/*")
+
+ (threads/demand-pipeline (range 0 10)
+ pool-init
+ do-work)
+ (is (= (length (os/dir "test/support/pool")) 10)))
+
+(run-tests!)