36e91c110c61ae0cc37b7a3ee558c61a865b2aee — Zach Smith 10 months ago 335e1d8
Add thread pool for writers
4 files changed, 59 insertions(+), 24 deletions(-)

M bagatto.janet
M main.janet
M src/core.janet
A src/writers.janet
M bagatto.janet => bagatto.janet +2 -2
@@ 7,6 7,7 @@

(import src/multimarkdown)
(import src/core)
(import src/writers)


@@ 379,8 380,7 @@
  (-> writer-specs


M main.janet => main.janet +4 -3
@@ 3,6 3,7 @@

(import src/core)
(import src/error)
(import src/writers)

(def bagatto

@@ 62,9 63,9 @@

        (def data (let [data-spec (index-value env 'data index)]
                    (core/load-data data-spec)))
        (def writer-specs (let [site-spec (index-value env 'site index)]
                            (core/produce-writer-specs site-spec data)))

        (-> writer-specs
        (writers/fanout writer-specs)))))

M src/core.janet => src/core.janet +0 -19
@@ 129,22 129,3 @@
        _ (error/site-error with-defaults)))) 

(defn- new-writer [type x y]
  (let [f (case type
            :write (fn [path contents]
                     (let [ppath (path/dirname path)]
                       (print path)
                       (util/mkpath ppath)
                       (spit path contents)))
            :copy (fn [from to]
                    (let [ppath (path/dirname to)]
                      (print to)
                      (util/mkpath ppath)
                      (util/copy-file from to))))]
    (fiber/new (fn [] (f x y))))) 

(defn produce-writers [specs] (seq [spec :in specs] (new-writer ;spec)))

(defn resume-writers [writers] (each writer writers (resume writer)))

A src/writers.janet => src/writers.janet +53 -0
@@ 0,0 1,53 @@
(import path)
(import src/util)

(defn- new-writer
  (fn [parent]
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        (match msg
          :die (thread/exit)

          [:write [:write path contents]]
          (let [ppath (path/dirname path)]
            (print path)
            (util/mkpath ppath)
            (spit path contents))
          [:write [:copy from to]]
          (let [ppath (path/dirname to)]
            (print to)
            (util/mkpath ppath)
            (util/copy-file from to)))))))

(def pool-size 6)
(def timeout 30)

(defn fanout
  Manage a demand-based worker thread pool for generating files.

  Spins up a pool of size `pool-size` and enters a receive loop. Each
  worker sends demand back to the main thread, and in response the
  main thread will send them a writer-spec to complete. When the queue
  is empty, the main thread sends a kill message to all writers.
  (def pool @{})
  (loop [t-id :range [0 pool-size]]
    (let [f (new-writer t-id)
          t (thread/new f 16)]
      (put pool t-id t)))
  (var q (array/slice writer-specs))
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)]
      (:send (pool t-id) [:write spec])))

  (each t pool (:send t :die)))