~subsetpark/bagatto

9715c702da21ffcd16adee581cbbd025fd79de09 — Zach Smith 10 months ago ff39a3e
Use threads for data loading, too
6 files changed, 141 insertions(+), 97 deletions(-)

M bagatto.janet
M main.janet
M src/core.janet
A src/loaders.janet
A src/threads.janet
M src/writers.janet
M bagatto.janet => bagatto.janet +10 -15
@@ 8,6 8,7 @@
(import src/multimarkdown)
(import src/core)
(import src/writers)
(import src/threads)

#
# API


@@ 94,27 95,21 @@

(def- core-* *)

(defmacro- defeach
  [name docstring body]
  ~(defn ,name
     ,docstring
     [pattern]
     (let [filenames (sh/glob pattern :x)]
       (fiber/new (fn [] (loop [it :in filenames] ,body))))))

(defeach *
(defn *
  ```
  Generate a fiber that will return all the filenames that match a given
  Generate a function that will return all the filenames that match a given
  file blob.
  ```
  (yield it))
  [pattern]
  (fn [] {:each (sh/glob pattern :x)}))

(defeach slurp-*
(defn slurp-*
  ```
  Generate a fiber that will slurp all the files that match a given
  Generate a function that will slurp all the files that match a given
  file blob.
  ```
  (yield [it (slurp it)]))
  [pattern]
  (fn [] {:each (map |[$0 (slurp $0)] (sh/glob pattern :x))}))

#
# Attribute Parsers


@@ 380,7 375,7 @@
  ```
  [writer-specs]
  (-> writer-specs
     (writers/fanout writers/handle-writes)))
     (threads/demand-pipeline writers/handle-writes)))

#
# TEMPLATE

M main.janet => main.janet +2 -1
@@ 4,6 4,7 @@
(import src/core)
(import src/error)
(import src/writers)
(import src/threads)

(def bagatto
  ```


@@ 68,4 69,4 @@
                            (core/produce-writer-specs site-spec data)))

        
        (writers/fanout writer-specs writers/handle-writes)))))
        (threads/demand-pipeline writer-specs writers/handle-writes)))))

M src/core.janet => src/core.janet +18 -36
@@ 2,6 2,8 @@

(import src/util)
(import src/error)
(import src/loaders)
(import src/threads)

(defn- struct->table [s]
  (->> (or s @{}) (kvs) (splice) (table)))


@@ 39,43 41,23 @@
  function that will be called on each file-contents.
  ```
  [data-spec]
  (def res @{})
  
  (defn make-attrs [filename &opt file-contents parser]
    (default parser (fn [_ x] x))
    (try
      (->> @{:path filename :src file-contents}
           (parser file-contents))
      ([err fib] (error/attrs-error err parser))))
  
  (loop [[spec-name spec] :pairs data-spec]
    (let [with-defaults (set-defaults spec)
          transform-f (or (spec :transform) identity)
          data (match with-defaults
                 
                 ({:src loader :attrs parser} (fiber? loader))
                 (-> (seq [loader-out :generate loader]
                         (match loader-out
                           [filename file-contents]
                           (make-attrs filename file-contents parser)
                           filename
                           (make-attrs filename)))
                    (transform-f))
                 
                 ({:src loader :attrs parser} (function? loader))
                 (let [[filename file-contents] (loader)]
                   (make-attrs filename file-contents parser))
                 
                 ({:src path :attrs parser} (string? path))
                 (let [file-contents (slurp path)]
                   (make-attrs path file-contents parser))
                 
                 {:attrs attrs}
                 attrs
                 
                 _ (error/data-error with-defaults))]
      (put res spec-name data)))
  res)
  (let [data-pairs (pairs data-spec)
        jobs (seq [[spec-name spec] :in data-pairs]
                  (let [with-defaults (set-defaults spec)
                        transform-f (or (spec :transform) identity)]
                    (match with-defaults
                      ({:src loader :attrs parser} (function? loader))
                      (loaders/from-file-spec-loader spec-name loader parser transform-f)
                                
                      ({:src path :attrs parser} (string? path))
                      (loaders/from-path-loader spec-name path parser)
                                
                      {:attrs attrs}
                      (loaders/bare-attr-loader spec-name attrs)
                                
                      _ (error/data-error with-defaults))))]
    (threads/distribute jobs)))

(defn produce-writer-specs 
  ```

A src/loaders.janet => src/loaders.janet +36 -0
@@ 0,0 1,36 @@
(import src/error)

(defn- make-attrs [parser filename &opt file-contents]
    (let [base-attrs @{:path filename :src file-contents}]
      (if file-contents
        (try (parser file-contents base-attrs) 
             ([err fib] (error/attrs-error err parser)))    
        base-attrs)))

(defn from-file-spec-loader
  [spec-name loader parser transform-f]
  (fn [parent]
    (let [loader-specs (loader)
          res (match loader-specs
                {:each ind}
                (->  (seq [spec :in ind]
                         (if (indexed? spec)
                           (make-attrs parser ;spec)
                           (make-attrs parser spec)))
                    (transform-f))
                                      
                {:some spec}
                (make-attrs parser ;spec))]
      (:send parent [:res spec-name res]))))

(defn from-path-loader
  [spec-name path parser]
  (fn [parent]
    (let [file-contents (slurp path)
          res (make-attrs parser path file-contents)]
      (:send parent [:res spec-name res]))))

(defn bare-attr-loader
  [spec-name attrs]
  (fn [parent]
    (:send parent [:res spec-name attrs])))

A src/threads.janet => src/threads.janet +73 -0
@@ 0,0 1,73 @@
### Two concurrency approaches using multithreading.
###
### `demand-pipeline`: given a list of sendable job specs, spin up a
### pool of demand-based consumers. The consumers will request new
### jobs as they're ready and process them. The pipeline will spin
### down the consumers when all the jobs have been processed.
###
### `distribute`: Given a list of threadable functions, spin up a new
### thread for each one, and collect the results.
###
### Both producers have a fixed job timeout of 30 seconds.

(def pool-size 6)
(def worker-mailbox 16)
(def timeout 30)

(defn- demand-worker
  [thread-id f]
  (fn [parent]
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        (f msg)))))

(defn demand-pipeline
  ```
  Manage a demand-based worker thread pool.

  Spins up a pool of size `pool-size` and enters a receive loop. Each
  worker sends demand back to the main thread, and in response the
  main thread will send them a writer-spec to complete. When the queue
  is empty, the main thread sends a kill message to all writers.
  ```
  [specs handler]
  
  (def pool @{})
  (loop [t-id :range [0 pool-size]]
    (let [f (demand-worker t-id handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))
  
  (var q (array/slice specs))
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)
          dispatch [:consume spec] ]
      (:send (pool t-id) dispatch)))

  (each t pool (:send t :die)))

(defn- one-time-worker [f] (thread/new (fn [parent] (f parent)) 1 :h))

(defn distribute
  ```
  Process, in parallel, a sequence of jobs.

  `jobs` should be a sequence of thread functions---ie, unary
  functions that take a parent thread as their argument---which will
  send return values of the form `[:res k v]` .

  Returns a mapping from `k` to `v`.
  ```
  [jobs]
  (let [workers (seq [job :in jobs] (one-time-worker job)) res @{}]
    (var got-back 0)
  
    (while (< got-back (length jobs))
      (match (thread/receive timeout)
        [:res k v]
        (do
          (set got-back (inc got-back))
          (put res k v))))
    res))

M src/writers.janet => src/writers.janet +2 -45
@@ 6,57 6,14 @@
  (match msg
    :die (thread/exit)
    
    [:write [:write path contents]]
    [:consume [:write path contents]]
    (let [ppath (path/dirname path)]
      (print path)
      (util/mkpath ppath)
      (spit path contents))
    
    [:write [:copy from to]]
    [:consume [:copy from to]]
    (let [ppath (path/dirname to)]
      (print to)
      (util/mkpath ppath)
      (util/copy-file from to))))

(defn- worker
  [thread-id f]
  (fn [parent]
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        (f msg)))))

(def pool-size 6)
(def worker-mailbox 16)
(def timeout 30)

(defn handle-demand
  [pool q]  
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)]
      (:send (pool t-id) [:write spec])))

  (each t pool (:send t :die)))

(defn fanout
  ```
  Manage a demand-based worker thread pool for generating files.

  Spins up a pool of size `pool-size` and enters a receive loop. Each
  worker sends demand back to the main thread, and in response the
  main thread will send them a writer-spec to complete. When the queue
  is empty, the main thread sends a kill message to all writers.
 
  ```
  [specs handler]
  
  (def pool @{})
  (loop [t-id :range [0 pool-size]]
    (let [f (worker t-id handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))
  
  (var q (array/slice specs))
  
  (handle-demand pool q))