~welt/murse

ref: db85e43da90177a9ed7275eb7f2d6952407c1466 murse/jobs.go -rw-r--r-- 2.3 KiB
db85e43dwelt guiprep: adjust logging 4 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/gammazero/workerpool"
)

func startDownloadVerify(files map[string]string, lfiles map[string]string) (bool, []error) {
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

	wp := workerpool.New(opt.Threads)

	completed := true // will be set to false by the functions below

	// Errors, mutex because workers report their own errors concurrently.
	errMutex := &sync.Mutex{}
	var errors []error

	errc := make(chan bool, opt.Threads) // used to signal an error occured, buffered in case multiple errors occured
	endc := make(chan bool, 1)           // Thread to tell this goroutine to stop checking for signals
	go func() {
		for {
			select {
			case <-sigc:
				log.Println("Signal recieved, gracefully shutting down...")
				completed = false
				wp.Stop()
				return
			case <-errc:
				log.Println("Error occured, gracefully stopping...")
				completed = false
				wp.Stop()
				return
			case <-endc:
				return
			}
		}
	}()

	// We're going to pause so that all jobs will be submitted by the time
	// any errors will occur (when it starts)
	pctx, c := context.WithCancel(context.Background())
	wp.Pause(pctx)

	for k, v := range files {
		var lv *string
		if _, ok := lfiles[k]; ok {
			s := lfiles[k]
			lv = &s
		}

		{ // new scope since we're dealing with a closure

			// set variables within our scope to the outside one.
			k := k
			v := v
			lv := lv

			f := func() {
				err := downloadAndVerify(k, v, lv)
				if err != nil {
					errMutex.Lock()
					defer errMutex.Unlock()
					errors = append(errors, err)
					if len(errors) > 0 {
						completed = false
						errc <- true
						return
					}
				}
			}
			wp.Submit(f)
		} // end scope
	}

	// Unpause
	c()
	// New channel, we'll wait for a signal to go ahead and exit
	// the function
	donec := make(chan bool)

	go func() {
		// hack, I'm going to fork the worker library so that
		// we can wait for a worker pool to stop instead of finish
		// its queue using a function like WaitStop
		for {
			if wp.WaitingQueueSize() == 0 && !wp.Stopped() {
				wp.Stop()
				donec <- true
				return
			}

			if wp.Stopped() {
				donec <- true
				return
			}

			time.Sleep(8 * time.Millisecond)
		}

	}()

	<-donec
	return completed, errors
}