ea5658a3d24a8d9a8423c30ff31c35e6fda189b9 — Zach Smith 2 months ago 1c91492
Use fibers not threads for writing files
5 files changed, 10 insertions(+), 81 deletions(-)

M main.janet
M repl.janet
M src/core.janet
M src/threads.janet
M src/writers.janet
M main.janet => main.janet +2 -1
@@ 54,4 54,5 @@
          (def writer-specs (let [site-spec (index-value env 'site index)]
                              (core/produce-writer-specs site-spec data env)))

          (core/evaluate-writer-specs env writer-specs))))))
          (def output-dir (env :bagatto-output-dir))
          (core/evaluate-writer-specs output-dir writer-specs))))))

M repl.janet => repl.janet +1 -12
@@ 23,15 23,4 @@
  [site data]
  (core/produce-writer-specs site data @{}))

(defn write-site
  Given the output of a site generation specification, trigger the
  actual file generation.

  Not necessary to define a module, but can be useful to debug your
  configuration from within the REPL.
  [writer-specs output-dir]
  (-> writer-specs
      (threads/side-effects-pipeline writers/writer-init
                                     (writers/handle-writes output-dir))))
(def write-site core/evaluate-writer-specs)

M src/core.janet => src/core.janet +5 -6
@@ 76,11 76,10 @@
  Third phase of business logic : given a list of writer specs, render
  them into new files.
  [env writer-specs]
  [output-dir writer-specs]

  (def output-dir (env :bagatto-output-dir))
  (def handler (writers/handle-writes output-dir))
  (def cache @{})

  (threads/side-effects-pipeline writer-specs
                                 (writers/handle-writes output-dir)
                                 (min threads/default-pool-size (length writer-specs))))
  (each spec writer-specs
    (ev/spawn (handler spec cache))))

M src/threads.janet => src/threads.janet +1 -58
@@ 1,17 1,7 @@
### Two concurrency approaches using multithreading.
### `demand-pipeline`: given a list of sendable job specs, spin up a
### pool of demand-based consumers. The consumers will request new
### jobs as they're ready and process them. The pipeline will spin
### down the consumers when all the jobs have been processed.
### `distribute`: Given a list of threadable functions, spin up a new
### `distribute-gather`: Given a list of threadable functions, spin up a new
### thread for each one, and collect the results.
### Both producers have a fixed job timeout of 30 seconds.

(def default-pool-size 6)
(def timeout 10)

(defn print

@@ 22,53 12,6 @@
  [& xs]
  (prin (string ;xs "\n")))

(defn- demand-worker
  [[id init handler demand]]
  (def state (init))
    (match (ev/select demand)
      [:take _ msg] (handler msg state)
      [:close _] (do
                   (ev/give-supervisor id)
      other (errorf "Received unknown pipeline msg: %q" other))))

(defn side-effects-pipeline
  Manage a thread pool of workers.

  Spins up a pool of size `pool-size` and begins to feed `specs` into a demand
  channel. When the jobs have all been handled, closes the pool and exits.
  [specs init handler &opt pool-size]

  (default pool-size default-pool-size)
  (def demand (ev/thread-chan pool-size))
  (def results (ev/thread-chan pool-size))

  (print "Starting worker pool with " pool-size " workers...")

  (def worker-fib (fiber/new demand-worker :t))
  (repeat pool-size
    (ev/thread worker-fib [(gensym) init handler demand] :n results))

  (var q (array/slice specs))
  (while (not (zero? (length q)))
    (let [spec (array/pop q)]
      (ev/give demand spec)))

  (while (not (zero? (ev/count demand)))

  (ev/chan-close demand)

  (var remaining pool-size)
  (while (not (zero? remaining))
    (ev/take results)
    (-- remaining))

  (print "Terminated worker pool."))

(defn distribute-gather
  Process, in parallel, a sequence of jobs.

M src/writers.janet => src/writers.janet +1 -4
@@ 1,7 1,6 @@
(import spork/path)

(import /src/util)
(import /src/threads)

(defn writer-init
  "Return an empty table to be used as path cache"

@@ 10,7 9,7 @@

(defn handle-writes
  (fn [msg path-cache]
  (fn write-handler [msg path-cache]
    (defn ensure-path [path]
      (let [s (string path)]
        (unless (in path-cache s)

@@ 22,7 21,6 @@
      (let [path (if output-dir (path/join output-dir path) path)
            ppath (path/dirname path)]

        (threads/print "[WRITE] " path)
        (ensure-path ppath)
        (spit path contents))

@@ 30,6 28,5 @@
      (let [to (if output-dir (path/join output-dir to) to)
            ppath (path/dirname to)]

        (threads/print "[COPY] " to)
        (ensure-path ppath)
        (util/copy-file from to path-cache)))))