~whereswaldon/sprout-go

b453c046837e8025efb91754c0f60b95c7eab3d0 — Chris Waldon 6 months ago 0db5bb2
feat: create supervisor loop to restart workers
2 files changed, 54 insertions(+), 36 deletions(-)

M cmd/relay/main.go
A supervisor.go
M cmd/relay/main.go => cmd/relay/main.go +7 -36
@@ 139,44 139,15 @@ and will establish Sprout connections to all addresses provided as arguments.
		}
	}()

	var peerTlsConfig *tls.Config
	if *insecure {
		peerTlsConfig = &tls.Config{
			InsecureSkipVerify: true,
		}
	}
	// dial peers mentioned in arguments
	for _, address := range flag.Args() {
		go func(addr string) {
			var tlsConfig *tls.Config
			if *insecure {
				tlsConfig = &tls.Config{
					InsecureSkipVerify: true,
				}
			}
			firstAttempt := true
			for {
				if !firstAttempt {
					log.Printf("Restarting worker for address %s", addr)
					time.Sleep(time.Second)
				}
				firstAttempt = false
				conn, err := tls.Dial("tcp", addr, tlsConfig)
				if err != nil {
					log.Printf("Failed to connect to %s: %v", addr, err)
					continue
				}
				worker, err := sprout.NewWorker(done, conn, messages)
				if err != nil {
					log.Printf("Failed launching worker to connect to address %s: %v", addr, err)
					continue
				}
				worker.Logger = log.New(log.Writer(), fmt.Sprintf("worker-%v ", addr), log.Flags())
				go worker.BootstrapLocalStore(1024)

				// block until the worker dies
				worker.Run()
				select {
				case <-done:
					return
				default:
				}
			}
		}(address)
		sprout.LaunchSupervisedWorker(done, address, messages, peerTlsConfig, log.New(log.Writer(), "", log.Flags()))
	}

	// Block until a signal is received.

A supervisor.go => supervisor.go +47 -0
@@ 0,0 1,47 @@
package sprout

import (
	"crypto/tls"
	"fmt"
	"log"
	"time"
)

// LaunchSupervisedWorker launches a worker in a new goroutine that will
// connect to `addr` and use `store` as its node storage. It will dial
// using the provided `tlsConfig`, and it will log errors on the given
// `logger`.
//
// BUG(whereswaldon): this interface is experimental and likely to change.
func LaunchSupervisedWorker(done <-chan struct{}, addr string, store SubscribableStore, tlsConfig *tls.Config, logger *log.Logger) {
	go func() {
		firstAttempt := true
		for {
			if !firstAttempt {
				logger.Printf("Restarting worker for address %s", addr)
				time.Sleep(time.Second)
			}
			firstAttempt = false
			conn, err := tls.Dial("tcp", addr, tlsConfig)
			if err != nil {
				logger.Printf("Failed to connect to %s: %v", addr, err)
				continue
			}
			worker, err := NewWorker(done, conn, store)
			if err != nil {
				logger.Printf("Failed launching worker to connect to address %s: %v", addr, err)
				continue
			}
			worker.Logger = log.New(logger.Writer(), fmt.Sprintf("worker-%v ", addr), log.Flags())
			go worker.BootstrapLocalStore(1024)

			// block until the worker dies
			worker.Run()
			select {
			case <-done:
				return
			default:
			}
		}
	}()
}