~subsetpark/bagatto

ff39a3e8aff3004798428f910a82826637ddab9d — Zach Smith 10 months ago 36e91c1
Factor out worker code a little
3 files changed, 37 insertions(+), 28 deletions(-)

M bagatto.janet
M main.janet
M src/writers.janet
M bagatto.janet => bagatto.janet +1 -1
@@ 380,7 380,7 @@
  ```
  [writer-specs]
  (-> writer-specs
     (writers/fanout)))
     (writers/fanout writers/handle-writes)))

#
# TEMPLATE

M main.janet => main.janet +1 -1
@@ 68,4 68,4 @@
                            (core/produce-writer-specs site-spec data)))

        
        (writers/fanout writer-specs)))))
        (writers/fanout writer-specs writers/handle-writes)))))

M src/writers.janet => src/writers.janet +35 -26
@@ 1,30 1,44 @@
(import path)
(import src/util)

(defn- new-writer
  [thread-id]
(defn handle-writes
  [msg]
  (match msg
    :die (thread/exit)
    
    [:write [:write path contents]]
    (let [ppath (path/dirname path)]
      (print path)
      (util/mkpath ppath)
      (spit path contents))
    
    [:write [:copy from to]]
    (let [ppath (path/dirname to)]
      (print to)
      (util/mkpath ppath)
      (util/copy-file from to))))

(defn- worker
  [thread-id f]
  (fn [parent]
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        (match msg
          :die (thread/exit)

          [:write [:write path contents]]
          (let [ppath (path/dirname path)]
            (print path)
            (util/mkpath ppath)
            (spit path contents))
            
          [:write [:copy from to]]
          (let [ppath (path/dirname to)]
            (print to)
            (util/mkpath ppath)
            (util/copy-file from to)))))))
        (f msg)))))

(def pool-size 6)
(def worker-mailbox 16)
(def timeout 30)

(defn handle-demand
  [pool q]  
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)]
      (:send (pool t-id) [:write spec])))

  (each t pool (:send t :die)))

(defn fanout
  ```
  Manage a demand-based worker thread pool for generating files.


@@ 35,19 49,14 @@
  is empty, the main thread sends a kill message to all writers.
 
  ```
  [writer-specs]
  [specs handler]
  
  (def pool @{})
  (loop [t-id :range [0 pool-size]]
    (let [f (new-writer t-id)
          t (thread/new f 16)]
    (let [f (worker t-id handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))
  
  (var q (array/slice writer-specs))
  (var q (array/slice specs))
  
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)]
      (:send (pool t-id) [:write spec])))

  (each t pool (:send t :die)))
  (handle-demand pool q))