~welt/murse

80dfaa0664529fd2201e466357c0053807f410e5 — welt 3 months ago e9db565
Fix and improve concurrency.
2 files changed, 39 insertions(+), 22 deletions(-)

M concurrent.go
M upgrade.go
M concurrent.go => concurrent.go +9 -4
@@ 8,6 8,7 @@ type Pool struct {
	Errch  chan error
	Jobch  chan func() error
	donech chan bool
	done   bool
	num    int
	ctx    context.Context
	ctxf   context.CancelFunc


@@ 23,7 24,7 @@ func NewPool(workers int) *Pool {

	pool.ctx, pool.ctxf = context.WithCancel(context.Background())

	for i := 1; i != workers; i++ {
	for i := 0; i != workers+1; i++ {
		go func() {
			for {
				select {


@@ 46,8 47,12 @@ func NewPool(workers int) *Pool {
}

func (p *Pool) Stop() {
	p.ctxf()
	for i := 1; i != p.num; i++ {
		<-p.donech
	if !p.done {
		p.ctxf()
		for i := 0; i != p.num+1; i++ {
			<-p.donech
		}

		p.done = true
	}
}

M upgrade.go => upgrade.go +30 -18
@@ 75,6 75,29 @@ func upgradeMain(dir string, url string, threads int, http2 bool) int {

	pool := NewPool(threads)

	dealErr := func() bool {
		select {
		case err := <-pool.Errch:
			if err != nil {
				pool.Stop()
				for {
					select {
					case err := <-pool.Errch:
						errPrintln(err)
					default:
						goto postloop
					}
				}
			postloop:
				return false
			}

		default:
		}

		return true
	}

	for _, v := range writes {
		change := v
		f := func() error {


@@ 105,29 128,18 @@ func upgradeMain(dir string, url string, threads int, http2 bool) int {

		pool.Jobch <- f

		select {
		case err := <-pool.Errch:
			if err != nil {
				pool.Stop()
				errPrintln(err)
				for {
					select {
					case err := <-pool.Errch:
						errPrintln(err)
					default:
						goto postloop
					}
				}
			postloop:
				return 1
			}
		default:
			continue
		ok := dealErr()
		if !ok {
			return 1
		}
	}

	pool.Stop()

	if ok := dealErr(); !ok {
		return 1
	}

	file, err := os.Create(filepath.Join(dir, ".revision"))
	if err != nil {
		errPrintln(err)