~subsetpark/bagatto

ca25d620bc210f4cdd69cb81e580a8079453338d — Zach Smith 20 days ago fb138c6
Threads test
2 files changed, 37 insertions(+), 9 deletions(-)

M src/threads.janet
A test/threads.janet
M src/threads.janet => src/threads.janet +6 -9
@@ 83,17 83,14 @@
  [jobs thread-init]

  (print "Beginning " (length jobs) " jobs...")

  (let [workers (seq [job :in jobs] (thread-init job))
        res @{}]
    (var got-back 0)

    (while (< got-back (length jobs))
  
  (map thread-init jobs)
  
  (let [res @{}]
    (while (< (length res) (length jobs))
      (match (thread/receive timeout)
        [:res k v]
        (do
          (set got-back (inc got-back))
          (put res k v))))
        (put res k v)))

    (print "Finished jobs.")
    res))

A test/threads.janet => test/threads.janet +31 -0
@@ 0,0 1,31 @@
(import testament :prefix "" :exit true)
(import src/threads)

(defn thread-init [f]
  (thread/new (fn [parent] (f parent))))

(defn dist-job [i]
  (fn [parent] (:send parent [:res i true])))

(deftest distribute
  (let [jobs (seq [x :range [0 10]] (dist-job x))
        out (threads/distribute jobs thread-init)]
    (is (==  @{0 true 2 true 1 true 3 true 5 true 6 true 4 true 8 true 7 true 9 true} out))))

(defn pool-init [] ())
(defn do-work
    [parent self msg _state]
    (match msg
      :die (do (:send parent self)
               (thread/exit))
      [:consume n] (spit (string "test/support/pool/" n) "ok")))

(deftest pool
  (os/shell "rm test/support/pool/*")
  
  (threads/demand-pipeline (range 0 10)
                           pool-init
                           do-work)
  (is (= (length (os/dir "test/support/pool")) 10)))

(run-tests!)