~subsetpark/bagatto

389a085594c1d5b9c39366eebac03733cd23a6db — Zach Smith 2 months ago 3491892 ev
threads->ev
M bagatto.janet => bagatto.janet +2 -1
@@ 6,6 6,7 @@
(import json)

(import /src/multimarkdown)
(import /src/util)

(defn- set-error-context!
  [k v]


@@ 130,7 131,7 @@
  file blob.
  ```
  [pattern]
  (fn [] {:each (map |[$0 (slurp $0)] (glob pattern))}))
  (fn [] {:each (map |[$0 (util/stream-file $0)] (glob pattern))}))

#
# Attribute Parsers

M src/core.janet => src/core.janet +2 -2
@@ 61,7 61,7 @@
               (setdyn :error-context {:spec-name spec-name})
               (loaders/from-spec (set-defaults spec) spec-name))]

    (threads/distribute jobs thread-init)))
    (threads/distribute jobs)))

(defn produce-writer-specs
  ```


@@ 78,7 78,7 @@
                                     spec-name
                                     data))]

    (let [segments (-> (threads/distribute jobs thread-init) (values))]
    (let [segments (-> (threads/distribute jobs) (values))]
      (array/concat ;segments))))

(defn evaluate-writer-specs

M src/generators.janet => src/generators.janet +8 -8
@@ 23,7 23,7 @@

(defn- render-each-generator
  [data spec-name filter site-selector path-generator renderer]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)
    (def res @[])



@@ 32,11 32,11 @@
        (let [path (apply-path path-generator [data item] :di)
              contents (apply-renderer renderer [data item] :di)]
          (array/push res [:write path contents]))))
    (:send parent [:res spec-name res])))
    (ev/give chan [:res spec-name res])))

(defn- render-generator
  [data spec-name path-generator renderer]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)

    (threads/print "Rendering " spec-name "...")


@@ 45,11 45,11 @@
    (if-let [path (apply-path path-generator [data] :d)
             contents (apply-renderer renderer [data] :d)]
      (array/push res [:write path contents]))
    (:send parent [:res spec-name res])))
    (ev/give chan [:res spec-name res])))

(defn- copy-each-generator
  [data spec-name filter site-selector path-generator]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)

    (threads/print "Generating paths for " spec-name "...")


@@ 62,11 62,11 @@
        (let [from (item :path)
              to (apply-path path-generator [data item] :di)]
          (array/push res [:copy from to]))))
    (:send parent [:res spec-name res])))
    (ev/give chan [:res spec-name res])))

(defn- copy-some-generator
  [data spec-name site-selector path-generator]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)

    (threads/print "Generating path for " spec-name "...")


@@ 78,7 78,7 @@
      (let [from (item :path)
            to (apply-path path-generator [data] :d)]
        (array/push res [:copy from to])))
    (:send parent [:res spec-name res])))
    (ev/give chan [:res spec-name res])))

(defn from-spec
  [spec spec-name data]

M src/loaders.janet => src/loaders.janet +8 -7
@@ 1,6 1,7 @@
(import /src/error)
(import /src/threads)
(import /src/env)
(import /src/util)

(defn- set-cxt! [spec-name]
  (setdyn :error-context {:spec-name spec-name}))


@@ 16,7 17,7 @@
  specs and sends back a list of attributes.
  ```
  [spec-name loader parser transform-f]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)
    (threads/print "Loading " spec-name "...")



@@ 36,7 37,7 @@
                (make-attrs parser ;spec)

                _ (error/loader-spec-error spec-name loader))]
      (:send parent [:res spec-name res]))))
      (ev/give chan [:res spec-name res]))))

(defn- from-path-loader
  ```


@@ 44,13 45,13 @@
  attributes.
  ```
  [spec-name path parser]
  (fn [parent]
  (fn [chan]
    (set-cxt! spec-name)
    (threads/print "Loading " spec-name " (" path ")...")

    (let [file-contents (slurp path)
    (let [file-contents (util/stream-file path)
          res (make-attrs parser path file-contents)]
      (:send parent [:res spec-name res]))))
      (ev/give chan [:res spec-name res]))))

(defn- bare-attr-loader
  ```


@@ 59,9 60,9 @@
  [spec-name attrs]
  (set-cxt! spec-name)

  (fn [parent]
  (fn [chan]
    (threads/print "Loaded " spec-name)
    (:send parent [:res spec-name attrs])))
    (ev/give chan [:res spec-name attrs])))

(defn from-spec
  [spec spec-name]

M src/threads.janet => src/threads.janet +28 -28
@@ 24,15 24,18 @@
  (prin (string ;xs "\n")))

(defn- demand-worker
  [thread-id init f]
  (fn [parent]
  [init f]
  (fn [chan]
    (def state (init))
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]

    (forever
      (let [msg (ev/take chan)]
        # `f` has to handle a `:die` message or this will never
        # terminate.
        (f parent thread-id msg state)))))
        (case (f msg state)
          :die (do
                 (ev/give chan :die)
                 (break)))))))

(defn demand-pipeline
  ```


@@ 45,28 48,24 @@
  ```
  [specs init handler &opt pool-size]

  (def pool @{})
  (default pool-size default-pool-size)
  (def chan (ev/chan 10))

  (print "Starting worker pool with " pool-size " workers...")
  (ev/spawn
    (each spec specs (ev/give chan [:consume spec]))
    (ev/give chan :die))

  (loop [t-id :range [0 pool-size]]
    (let [f (demand-worker t-id init handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))
  (default pool-size default-pool-size)

  (var q (array/slice specs))
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)
          dispatch [:consume spec]]
      (:send (pool t-id) dispatch)))
  (print "Starting worker pool with " pool-size " workers...")

  (each t pool (:send t :die))
  (def pool (seq [_ :range [0 pool-size]]
              (let [f (demand-worker init handler)]
                (ev/call f chan))))

  (while (not (zero? (length pool)))
    (let [exiter (thread/receive timeout)]
      (put pool exiter nil)))
  (forever
    (if (any? (map fiber/can-resume? pool))
      (ev/sleep 0.1)
      (break)))

  (print "Terminated worker pool."))



@@ 80,15 79,16 @@

  Returns a mapping from `k` to `v`.
  ```
  [jobs thread-init]
  [jobs]

  (print "Beginning " (length jobs) " jobs...")
  
  (map thread-init jobs)
  
  (def chan (ev/chan))

  (map |(ev/call $ chan) jobs)

  (let [res @{}]
    (while (< (length res) (length jobs))
      (match (thread/receive timeout)
      (match (ev/take chan)
        [:res k v]
        (put res k v)))


M src/util.janet => src/util.janet +18 -1
@@ 13,8 13,25 @@
            (os/mkdir s)
            (put cache s true)))))))

(defn stream-file
  [path]
  (:read (os/open path) :all))

(defn write-file
  [path contents]
  (ev/write (os/open path :wc) contents))

(defn pipe-stream
  [from to]
  (ev/spawn
    (while (def b (ev/read from 1024))
      (ev/write to b))
    (ev/close to)))

(defn copy-file
  [source dest cache]
  (case ((os/stat source) :mode)
    :directory (mkpath dest cache)
    (spit dest (slurp source))))
    (let [from (os/open source)
          to (os/open dest :wc)]
      (pipe-stream from to))))

M src/writers.janet => src/writers.janet +3 -4
@@ 9,7 9,7 @@

(defn handle-writes
  [output-dir]
  (fn [parent self msg path-cache]
  (fn [msg path-cache]
    (defn ensure-path [path]
      (let [s (string path)]
        (unless (in path-cache s)


@@ 17,8 17,7 @@
          (put path-cache s true))))

    (match msg
      :die (do (:send parent self)
             (thread/exit))
      :die :die

      [:consume [:write path contents]]
      (let [path (if output-dir (path/join output-dir path) path)


@@ 26,7 25,7 @@

        (threads/print "[WRITE] " path)
        (ensure-path ppath)
        (spit path contents))
        (util/write-file path contents))

      [:consume [:copy from to]]
      (let [to (if output-dir (path/join output-dir to) to)

M test/generators.janet => test/generators.janet +1 -1
@@ 3,7 3,7 @@
(import /src/generators)

(defn- parent []
  @{:send (fn [self res] (put self :res res))})
  (ev/chan))

(defn path-builder [data item] (string "test/out/" (item :topic)))
(defn exclaim [data item] (string (item :topic) "!"))

M test/loaders.janet => test/loaders.janet +1 -2
@@ 3,8 3,7 @@
(import /bagatto)
(import /src/loaders)

(defn- parent []
  @{:send (fn [self res] (put self :res res))})
(defn- parent [] (ev/chan))

(deftest path-loader
  (let [loader (loaders/from-spec {:src "test/support/test.txt"

M test/threads.janet => test/threads.janet +9 -12
@@ 1,28 1,25 @@
(import testament :prefix "" :exit true)
(import /src/threads)

(defn thread-init [f]
  (thread/new (fn [parent] (f parent))))
(import /src/util)

(defn dist-job [i]
  (fn [parent] (:send parent [:res i true])))
  (fn [chan] (ev/give chan [:res i true])))

(deftest distribute
  (let [jobs (seq [x :range [0 10]] (dist-job x))
        out (threads/distribute jobs thread-init)]
    (is (==  @{0 true 2 true 1 true 3 true 5 true 6 true 4 true 8 true 7 true 9 true} out))))
        out (threads/distribute jobs)]
    (is (== @{0 true 2 true 1 true 3 true 5 true 6 true 4 true 8 true 7 true 9 true} out))))

(defn pool-init [] ())
(defn do-work
    [parent self msg _state]
    (match msg
      :die (do (:send parent self)
               (thread/exit))
      [:consume n] (spit (string "test/support/pool/" n) "ok")))
  [msg _state]
  (match msg
    :die :die
    [:consume n] (util/write-file (string "test/support/pool/" n) "ok")))

(deftest pool
  (os/shell "rm test/support/pool/*")
  

  (threads/demand-pipeline (range 0 10)
                           pool-init
                           do-work)