~subsetpark/bagatto

bagatto/src/threads.janet -rw-r--r-- 2.7 KiB
4fc04b3a — Zach Smith Add render some 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
### Two concurrency approaches using multithreading.
###
### `demand-pipeline`: given a list of sendable job specs, spin up a
### pool of demand-based consumers. The consumers will request new
### jobs as they're ready and process them. The pipeline will spin
### down the consumers when all the jobs have been processed.
###
### `distribute`: Given a list of threadable functions, spin up a new
### thread for each one, and collect the results.
###
### Both producers have a fixed job timeout of 30 seconds.

(def default-pool-size 6)
(def worker-mailbox 16)
(def timeout 10)

(defn print
  ```
  Mimic the behaviour of `print`, but concatenate the newline to the
  string and then write, rather than writing both in sequence. Ensures
  that threaded output won 't be interleaved.
  ```
  [& xs]
  (prin (string ;xs "\n")))

(defn- demand-worker
  [thread-id init f]
  (fn [parent]
    (def state (init))
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        # `f` has to handle a `:die` message or this will never
        # terminate.
        (f parent thread-id msg state)))))

(defn demand-pipeline
  ```
  Manage a demand-based worker thread pool.

  Spins up a pool of size `pool-size` and enters a receive loop. Each
  worker sends demand back to the main thread, and in response the
  main thread will send them a writer-spec to complete. When the queue
  is empty, the main thread sends a kill message to all writers.
  ```
  [specs init handler &opt pool-size]

  (def pool @{})
  (default pool-size default-pool-size)

  (print "Starting worker pool with " pool-size " workers...")

  (loop [t-id :range [0 pool-size]]
    (let [f (demand-worker t-id init handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))

  (var q (array/slice specs))
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)
          dispatch [:consume spec]]
      (:send (pool t-id) dispatch)))

  (each t pool (:send t :die))

  (while (not (zero? (length pool)))
    (let [exiter (thread/receive timeout)]
      (put pool exiter nil)))

  (print "Terminated worker pool."))

(defn distribute
  ```
  Process, in parallel, a sequence of jobs.

  `jobs` should be a sequence of thread functions---ie, unary
  functions that take a parent thread as their argument---which will
  send return values of the form `[:res k v]` .

  Returns a mapping from `k` to `v`.
  ```
  [jobs thread-init]

  (print "Beginning " (length jobs) " jobs...")

  (map thread-init jobs)

  (let [res @{}]
    (while (< (length res) (length jobs))
      (match (thread/receive timeout)
        [:res k v]
        (put res k v)))

    (print "Finished jobs.")
    res))