~welt/murse

ref: v0.2.0 murse/concurrent.go -rw-r--r-- 840 bytes
08790b5bwelt Bump from patch to minor, there's a breaking change. 4 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
	"context"
)

type Pool struct {
	Errch  chan error
	Jobch  chan func() error
	donech chan bool
	done   bool
	num    int
	ctx    context.Context
	ctxf   context.CancelFunc
}

func NewPool(workers int) *Pool {
	pool := &Pool{
		Errch:  make(chan error),
		Jobch:  make(chan func() error),
		donech: make(chan bool),
		num:    workers,
	}

	pool.ctx, pool.ctxf = context.WithCancel(context.Background())

	for i := 0; i != workers+1; i++ {
		go func() {
			for {
				select {
				case job := <-pool.Jobch:
					err := job()
					if err != nil {
						go func() { pool.Errch <- err }()
					}

				case <-pool.ctx.Done():
					pool.donech <- true
					return
				}
			}
		}()
	}

	return pool

}

func (p *Pool) Stop() {
	if !p.done {
		p.ctxf()
		for i := 0; i != p.num+1; i++ {
			<-p.donech
		}

		p.done = true
	}
}