~welt/murse

40e049dd726a64b001ef7c193cda19d46307c577 — welt 4 months ago 6732aed
Improve threading, fix memory leak, major refactor
6 files changed, 178 insertions(+), 97 deletions(-)

A Makefile
M README
M download.go
M flags.go
A jobs.go
M main.go
A Makefile => Makefile +4 -0
@@ 0,0 1,4 @@
static:
	CGO_ENABLED=0 go build -tags netgo -ldflags '-s -w' -o murse
cgo:
	go build  -ldflags "-s -w" ./ 

M README => README +9 -8
@@ 3,9 3,9 @@ includes new (non-)features, less bugs, and speedier downloads. I
created this because if you know me, you know I hate deploying Python.
Additionally ofatomic makes some poor design decisions that makes
downloads slow and unreliable. Because this is a reimplementation it's
shares limitations with ofatomic (such as using an unecessarily slow &
unusual hash for updates/verification). One of the things I want to do
is rewrite the backend to get around these limitations.
shares limitations with ofatomic (such as using a slow hash for
updates/verification). One of the things I want to do is rewrite the
backend to get around these limitations.

FEATURES:



@@ 15,15 15,16 @@ which means that deploying is simply a matter of copying the binary to
your server, and running it.

Lightweight:
Since this is written in a compiled, statically typed language, murse
is much faster than the Python implementation. It's been designed to be
faster too, adjustments have been made that make is significantly
faster.
Since this is written in a compiled, lower level language, murse is
much faster than the Python implementation. It's been designed to be
quicker too, adjustments have been made that make is significantly
speedier.

Cross-platform:
Of course, we have the usual suspects (Windows, Linux) but we also
support other operating systems, like FreeBSD. Hypothetically this will
run anywhere Go can.
run anywhere Go can. Although I'm not sure why you would want this
on NetBSD or something.

Compatability:
We try to include flags such that murse is backwards compatible with

M download.go => download.go +17 -20
@@ 42,73 42,70 @@ func get(url string) (*[]byte, error) {

	if mimetype.Detect(b).String() == "application/zstd" {
		d, _ := zstd.NewReader(nil)
		defer d.Close()
		bd, err := d.DecodeAll(b, nil)
		if err != nil {
			return nil, err
		}

		defer d.Close()

		return &bd, nil
	}
	return &b, nil
}

func downloadAndVerify(path string, checksum string, errc chan error) {
func downloadAndVerify(path string, checksum string, lchecksum *string) error {
	log.Println(path)
	ff := opt.Directory + "/" + path
	if _, err := os.Stat(ff); !os.IsNotExist(err) {
		b, err := os.ReadFile(ff)
		if err != nil {
			errc <- err
			return
			return err
		}

		h := sha512.Sum384(b)

		if isProtected(path) && !install && !opt.OverwriteProtected {
			dh, err := base16ToHash(localFiles[path]) // <^ if protected and modified
		if isProtected(path) && !install && !opt.OverwriteProtected && lchecksum != nil {
			dh, err := base16ToHash(*lchecksum) // <^ if protected and modified
			if err != nil {
				errc <- err
				return
				return err
			}

			if *dh == h {
				log.Println("File is protected, will not overwrite: " + ff)
				return
				return nil
			}
		}

		dh, err := base16ToHash(newFiles[path])
		dh, err := base16ToHash(checksum)
		if err != nil {
			errc <- err
			return
			return err
		}
		if *dh == h {
			log.Println("File exists and matches hash, skipping: " + ff)
			return
			return nil
		}
	}

	err := os.MkdirAll(filepath.Dir(ff), 0777)
	if err != nil && !errors.Is(err, os.ErrExist) {
		errc <- err
		return
		return err
	}

	b, err := get(opt.URL + "/" + path)
	if err != nil {
		errc <- err
		return
		return err
	}

	t, err := compareBytesHashSum(checksum, b)
	if err != nil {
		errc <- err
		return
		return err
	}
	if !t {
		errc <- errors.New("file failed integrity check:" + path)
		return
		return errors.New("file failed integrity check:" + path)
	}

	os.WriteFile(ff, *b, 0666)
	return nil
}

M flags.go => flags.go +29 -0
@@ 1,7 1,9 @@
package main

import (
	"log"
	"os"
	"runtime"

	"github.com/jessevdk/go-flags"
)


@@ 26,3 28,30 @@ func init() {
	lmd = opt.Directory + "/.murse/local.json"
	lfd = opt.Directory + "/.murse/lockfile"
}

func init() {
	// We subtract one from the CPU count to leave thread(s) for the OS.
	// We set it to one on tri-core-or-less systems so pablo.gonzales2003's
	// Gateway netbook doesn't burn to a crisp.

	// We limit the automatic count to 6 so we don't burn OF's servers
	// to a crisp as well.

	if opt.Threads < 1 {
		switch {
		case runtime.NumCPU() > 6:
			opt.Threads = 6
		case runtime.NumCPU() < 3:
			opt.Threads = 1
		}
	}

	if opt.Threads > 6 {
		log.Println("!")
		log.Printf("WARNING: You are using %v threads. ", opt.Threads)
		log.Println("Please practice proper netiquette and make sure the")
		log.Println("place you're fetching from is okay with that many")
		log.Println("concurrent downloads going at once.")
		log.Println("!")
	}
}

A jobs.go => jobs.go +106 -0
@@ 0,0 1,106 @@
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/gammazero/workerpool"
)

func startDownloadVerify(files map[string]string, lfiles map[string]string) (bool, []error) {
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)

	wp := workerpool.New(opt.Threads)

	completed := true // will be set to false by the functions below

	// Errors, mutex because workers report their own errors concurrently.
	errMutex := &sync.Mutex{}
	var errors []error

	errc := make(chan bool, opt.Threads) // used to signal an error occured, buffered in case multiple errors occured
	endc := make(chan bool, 1)           // Thread to tell this goroutine to stop checking for signals
	go func() {
		for {
			select {
			case <-sigc:
				log.Println("Signal recieved, gracefully shutting down...")
				completed = false
				wp.Stop()
				return
			case <-errc:
				log.Println("Error occured, gracefully stopping...")
				completed = false
				wp.Stop()
				return
			case <-endc:
				return
			}
		}
	}()

	// We're going to pause so that all jobs will be submitted by the time
	// any errors will occur
	pctx, c := context.WithCancel(context.Background())
	wp.Pause(pctx)

	for k, v := range files {
		var lv *string
		if _, ok := lfiles[k]; ok {
			s := lfiles[k]
			lv = &s
		}

		{
			// new scope since we're dealing with a closure
			// set variables within our scope to the outside one.
			k := k
			v := v
			lv := lv
			f := func() {
				err := downloadAndVerify(k, v, lv)
				if err != nil {
					errMutex.Lock()
					defer errMutex.Unlock()
					errors = append(errors, err)
					if len(errors) > 0 {
						completed = false
						errc <- true
						return
					}
				}
			}
			wp.Submit(f)
			// end scope
		}
	}

	// Unpause
	c()
	// New channel, we'll wait for a signal to go ahead and exit
	// the function
	donec := make(chan bool)

	go func() {
		// hack, I'm going to fork the worker library so that
		// we can wait for a worker pool to stop instead of finish
		// its tasks using a function like WaitStop
		for {
			if wp.Stopped() {
				donec <- true
				return
			}
			time.Sleep(8 * time.Millisecond)
		}

	}()

	<-donec
	return completed, errors
}

M main.go => main.go +13 -69
@@ 1,22 1,12 @@
package main

import (
	"context"
	"crypto/rsa"
	"database/sql"
	"log"
	"os"
	"runtime"

	"github.com/gammazero/workerpool"
)

// This is used for detecting old files to clean up.
var localFiles map[string]string

// A map of revised files to download
var newFiles map[string]string

// If this is set we're doing a first-time installation
var install bool



@@ 29,7 19,7 @@ type file struct {

func main() {
	if fileExists(lfd) && !opt.IgnoreLockfile {
		log.Fatal("Lockfile exists, are you running another installation? Re-run with -b to delete the lockfile.")
		log.Fatal("Lockfile exists, are you running another installation? Re-run with -b to run regardless of the lockfile.")
	}

	if !fileExists(lmd) {


@@ 70,6 60,7 @@ func main() {
	defer db.Close()

	var lrev int
	var localFiles map[string]string
	if !install {
		err = manifest.read()
		if err != nil {


@@ 88,7 79,7 @@ func main() {
		log.Fatal(err)
	}

	newFiles, err = getRevisedFiles(db, lrev, nrev)
	newFiles, err := getRevisedFiles(db, lrev, nrev)
	if err != nil {
		log.Fatal(err)
	}


@@ 112,67 103,20 @@ func main() {
	lockInstall()
	defer unlockInstall()

	// We subtract one from the CPU count to leave thread(s) for the OS.
	// We set it to one on tri-core-or-less systems so pablo.gonzales2003's
	// Gateway netbook doesn't burn to a crisp.
	if opt.Threads > 1 {
		if runtime.NumCPU() > 3 {
			opt.Threads = 1
	s, errs := startDownloadVerify(newFiles, localFiles)
	if !s {
		if len(errs) != 0 {
			for _, v := range errs {
				log.Println(v)
			}
			log.Fatal("Operation failed due to errors. Include this log if you file a bug report.")
		} else {
			opt.Threads = runtime.NumCPU() - 1
		}
	}

	wp := workerpool.New(opt.Threads)

	errc := make(chan error, opt.Threads)

	// Create context for if the operation is cancelled due to an error
	// or signal.
	ctx, c := context.WithCancel(context.Background())
	// Channel that reports when the error handling is done.
	donec := make(chan bool)

	// Begin listening for an error, and if so wait for all workers to
	// stop before closing the error channel
	go func() {
		err := <-errc
		c()
		ctx.Done()
		wp.Stop()
		log.Println(err)
		for err := range errc {
			log.Println(err)
			log.Fatal("Download not completed.")
		}
		close(donec)
		wp.StopWait()
		close(donec)
	}()

	// Create context to wait for all the jobs to submitted
	pctx, pc := context.WithCancel(context.Background())
	wp.Pause(pctx)

	for k, v := range newFiles {
		p, c := k, v
		wp.Submit(func() {
			downloadAndVerify(p, c, errc)
		})
	}
	// Cancel pause context and start jobs.
	pc()

	// Wait for jobs to finish
	wp.StopWait()

	// If the context was cancelled, wait for a signal from the error
	// goroutine to finish then exit the program.
	if ctx.Err() != nil {
		<-donec
		log.Fatal("Errors occured while downloading!")
		unlockInstall()
		os.Exit(1)
	}

	}
	// Sadly ofatomic has no sense of files being removed
	//	if !install {
	//		remOutdated()