~subsetpark/bagatto

22683bebdf00ea054405e41739fc0ef5fdeb7666 — Zach Smith 2 years ago 09181f7
Demand pool management: wait for all threads to exit before main thread exits
4 files changed, 12 insertions(+), 7 deletions(-)

M lockfile.jdn
M src/threads.janet
M src/util.janet
M src/writers.janet
M lockfile.jdn => lockfile.jdn +1 -2
@@ 6,5 6,4 @@
  {:sha "b6dd2f3dcfebd26b027234a84029ee099b83b1f0" :repo "https://github.com/janet-lang/path.git"}
  {:sha "61437d96b5df6eb7e524f88847e7d7521201662d" :repo "https://github.com/janet-lang/json.git"}
  {:sha "4226a4cae548fde38358aa1fdb6d6a90bf5fe175" :repo "https://github.com/andrewchambers/janet-sh.git"}
  {:sha "3bd3376a3eb838201738a6c8945069b72591f7fa" :repo "https://github.com/joy-framework/moondown"}
  {:sha "4ed0fefd236be165d601e93e683749188666b5b9" :repo "git@git.sr.ht:~subsetpark/bagatto"}]
  {:sha "3bd3376a3eb838201738a6c8945069b72591f7fa" :repo "https://github.com/joy-framework/moondown"}]

M src/threads.janet => src/threads.janet +6 -1
@@ 32,7 32,7 @@
      (let [msg (thread/receive)]
        # `f` has to handle a `:die` message or this will never
        # terminate.
        (f msg state)))))
        (f parent thread-id msg state)))))

(defn demand-pipeline
  ```


@@ 63,6 63,11 @@
      (:send (pool t-id) dispatch)))

  (each t pool (:send t :die))

  (while (not (zero? (length pool)))
    (let [exiter (thread/receive timeout)]
      (put pool exiter nil)))

  (print "Terminated worker pool."))

(defn- one-time-worker

M src/util.janet => src/util.janet +2 -2
@@ 16,5 16,5 @@
(defn copy-file
  [source dest cache]
  (case ((os/stat source) :mode)
    :file (spit dest (slurp source))
    :directory (mkpath dest cache)))
    :directory (mkpath dest cache)
    (spit dest (slurp source))))

M src/writers.janet => src/writers.janet +3 -2
@@ 9,7 9,7 @@

(defn handle-writes
  [output-dir]
  (fn [msg path-cache]
  (fn [parent self msg path-cache]
    (defn ensure-path [path]
      (let [s (string path)]
        (unless (in path-cache s)


@@ 17,7 17,8 @@
          (put path-cache s true))))

    (match msg
      :die (thread/exit)
      :die (do (:send parent self)
             (thread/exit))

      [:consume [:write path contents]]
      (let [path (if output-dir (path/join output-dir path) path)