~subsetpark/bagatto

ref: d6a048f9703c9294c7d61e32cb3831cb6922becf bagatto/src/threads.janet -rw-r--r-- 2.6 KiB
d6a048f9 — Zach Smith More progress logging 10 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
### Two concurrency approaches using multithreading.
###
### `demand-pipeline`: given a list of sendable job specs, spin up a
### pool of demand-based consumers. The consumers will request new
### jobs as they're ready and process them. The pipeline will spin
### down the consumers when all the jobs have been processed.
###
### `distribute`: Given a list of threadable functions, spin up a new
### thread for each one, and collect the results.
###
### Both producers have a fixed job timeout of 30 seconds.

(def pool-size 6)
(def worker-mailbox 16)
(def timeout 30)

(defn- demand-worker
  [thread-id f]
  (fn [parent]
    (while true
      (:send parent thread-id)
      (let [msg (thread/receive)]
        # `f` has to handle a `:die` message or this will never
        # terminate.
        (f msg)))))

(defn demand-pipeline
  ```
  Manage a demand-based worker thread pool.

  Spins up a pool of size `pool-size` and enters a receive loop. Each
  worker sends demand back to the main thread, and in response the
  main thread will send them a writer-spec to complete. When the queue
  is empty, the main thread sends a kill message to all writers.
  ```
  [specs handler]
  
  (def pool @{})
  (def pool-size pool-size)

  (print "Starting worker pool with " pool-size " workers...")
  
  (loop [t-id :range [0 pool-size]]
    (let [f (demand-worker t-id handler)
          t (thread/new f worker-mailbox)]
      (put pool t-id t)))
  
  (var q (array/slice specs))
  (while (not (zero? (length q)))
    (let [t-id (thread/receive timeout)
          spec (array/pop q)
          dispatch [:consume spec] ]
      (:send (pool t-id) dispatch)))

  (each t pool (:send t :die))
  (print "Terminated worker pool."))

(defn- one-time-worker
  [f]
  (thread/new (fn [parent] (f parent)) 1 :h))

(defn distribute
  ```
  Process, in parallel, a sequence of jobs.

  `jobs` should be a sequence of thread functions---ie, unary
  functions that take a parent thread as their argument---which will
  send return values of the form `[:res k v]` .

  Returns a mapping from `k` to `v`.
  ```
  [jobs]

  (print "Beginning " (length jobs) " jobs...")
  
  (let [workers (seq [job :in jobs] (one-time-worker job))
        res @{}]
    (var got-back 0)
  
    (while (< got-back (length jobs))
      (match (thread/receive timeout)
        [:res k v]
        (do
          (set got-back (inc got-back))
          (put res k v))))

    (print "Finished jobs.")
    res))

(defn print
  ```
  Mimic the behaviour of `print`, but concatenate the newline to the
  string and then write, rather than writing both in sequence. Ensures
  that threaded output won 't be interleaved.
  ```
  [& xs]
  (prin (string ;xs "\n")))