~egtann/sjs

a6cb5a0b33f4f9829f384b98478e7b123c650dc7 — Evan Tann 2 years ago 9788454
add features, fix bugs
9 files changed, 334 insertions(+), 100 deletions(-)

M data_storage.go
M error.go
A go.mod
A go.sum
A http/client.go
R http/{server.go => service.go}
M job.go
D logger.go
M worker.go
M data_storage.go => data_storage.go +9 -2
@@ 1,13 1,20 @@
package sjs

import "context"

type DataStorage interface {
	// GetJobForID should report a Missing error if a job by that ID does
	// not exist.
	GetJobForID(context.Context, int) (*Job, error)

	// CreateJob reports the created ID and an error, if any.
	CreateJob(context.Context, *Job) (int, error)
	// GetOrCreateJob reports the created ID and an error, if any.
	GetOrCreateJob(context.Context, *Job) (int, error)

	// GetActiveJobs reports all incomplete, non-paused jobs.
	GetActiveJobs(context.Context) ([]*Job, error)

	// CreateJobResult tracks successes and failures.
	CreateJobResult(context.Context, *JobResult) error

	UpdateJob(context.Context, *Job) error
}

M error.go => error.go +2 -0
@@ 1,5 1,7 @@
package sjs

import "github.com/pkg/errors"

type Missing error

func IsMissing(err error) bool {

A go.mod => go.mod +6 -0
@@ 0,0 1,6 @@
module github.com/egtann/sjs

require (
	github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da
	github.com/pkg/errors v0.8.0
)

A go.sum => go.sum +3 -0
@@ 0,0 1,3 @@
github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da h1:5y58+OCjoHCYB8182mpf/dEsq0vwTKPOo4zGfH0xW9A=
github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da/go.mod h1:oLH0CmIaxCGXD67VKGR5AacGXZSMznlmeqM8RzPrcY8=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

A http/client.go => http/client.go +88 -0
@@ 0,0 1,88 @@
package http

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"time"

	"github.com/egtann/sjs"
	"github.com/pkg/errors"
)

// Client communicates with SJS server through client.Notify, which continually
// sends heartbeats.
type Client struct {
	Capabilities []sjs.JobName

	client *http.Client
	url    string
	apiKey string

	// errCh is nil until Err() is called.
	errCh chan error
}

func NewClient(
	sjsURL, apiKey string,
	capabilities []sjs.JobName,
) *Client {
	return &Client{
		Capabilities: capabilities,
		url:          sjsURL + "/servers",
		apiKey:       apiKey,
		client:       &http.Client{Timeout: 10 * time.Second},
	}
}

// Notify sjs of some calling server's existence and capabilities. This
// function is called by the client and automatically performs a heartbeat with
// the job server.
func (c *Client) Notify() {
	go func() {
		err := c.notify()
		if err != nil && c.errCh != nil {
			c.errCh <- errors.Wrap(err, "notify")
		}
		for range time.Tick(15 * time.Second) {
			err = c.notify()
			if err != nil && c.errCh != nil {
				c.errCh <- errors.Wrap(err, "notify")
			}
		}
	}()
}

func (c *Client) notify() error {
	worker := &sjs.Worker{
		NotifyURL:    c.url,
		Capabilities: c.Capabilities,
	}
	body, err := json.Marshal(worker)
	if err != nil && c.errCh != nil {
		return errors.Wrap(err, "marshal worker")
	}
	req, err := http.NewRequest("POST", c.url, bytes.NewReader(body))
	if err != nil {
		return errors.Wrap(err, "new request")
	}
	req.Header.Set("X-API-Key", c.apiKey)
	resp, err := c.client.Do(req)
	if err != nil {
		return errors.Wrap(err, "do")
	}
	defer resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("%s responded with %d", c.url, resp.StatusCode)
	}
	return nil
}

// Err is a convenience wrapper for handling errors.
func (c *Client) Err() <-chan error {
	if c.errCh == nil {
		c.errCh = make(chan error, 1)
	}
	return c.errCh
}

R http/server.go => http/service.go +61 -62
@@ 4,7 4,9 @@ import (
	"context"
	"crypto/subtle"
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"path"
	"strconv"
	"strings"


@@ 15,30 17,27 @@ import (
	"github.com/pkg/errors"
)

type Server struct {
	mux     *http.ServeMux
type Service struct {
	Mux *http.ServeMux

	db      sjs.DataStorage
	log     sjs.Logger
	client  *http.Client
	workers *sjs.WorkerMap
	sjsURL  string
	apiKey  string
	workers sjs.WorkerMap
	errCh   chan error // Nil unless *service.Err() is called
}

// NewServer prepares the endpoints and starts the jobs. The error channel is
// NewService prepares the endpoints and starts the jobs. The error channel is
// optional; if the channel is not nil, the server will send errors encountered
// when running jobs. Jobs are distributed to workers round-robin.
func NewServer(
	log sjs.Logger,
func NewService(
	db sjs.DataStorage,
	apiKey string,
	version []byte,
	errCh chan<- error,
) (*Server, error) {
) (*Service, error) {
	srv := &Service{
		log:     log,
		db:      db,
		apiKey:  apiKey,
		client:  &http.Client{Timeout: 10 * time.Second},
		workers: sjs.NewWorkerMap(),
	}



@@ 50,16 49,14 @@ func NewServer(
	)
	mux := http.NewServeMux()
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		log.Printf("health checked\n")
		w.Write([]byte("OK"))
	})
	mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
		log.Printf("version checked\n")
		w.Write(version)
	})
	mux.Handle("/jobs", chain.Then(http.HandlerFunc(srv.handleJobs)))
	mux.Handle("/servers", chain.Then(http.HandlerFunc(srv.handleServers)))
	srv.mux = mux
	mux.Handle("/servers", chain.Then(http.HandlerFunc(srv.handleServices)))
	srv.Mux = mux

	// Fetch and start running active jobs
	jobs, err := db.GetActiveJobs(context.Background())


@@ 67,12 64,23 @@ func NewServer(
		return nil, errors.Wrap(err, "get active jobs")
	}
	for _, job := range jobs {
		sjs.Schedule(job)
		sjs.Schedule(db, srv.workers, job, srv.errCh)
	}

	// Remove workers from rotation when they stop sending heartbeats.
	srv.workers.PurgeWorkersEvery(30 * time.Second)
	return srv, nil
}

func (s *Server) handleJobs(w http.ResponseWriter, r *http.Request) {
// Err is a convenience wrapper for handling errors.
func (s *Service) Err() <-chan error {
	if s.errCh == nil {
		s.errCh = make(chan error, 1)
	}
	return s.errCh
}

func (s *Service) handleJobs(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "GET":
		s.handleGetJobs(w, r)


@@ 83,44 91,44 @@ func (s *Server) handleJobs(w http.ResponseWriter, r *http.Request) {
	}
}

func (s *Server) handleServers(w http.ResponseWriter, r *http.Request) {
func (s *Service) handleServices(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "POST":
		s.addServer(w, r)
		s.addService(w, r)
	default:
		http.NotFound(w, r)
	}
}

// addServer notifies the job service of a new endpoint and its capabilities.
func (s *Server) addServer(w http.ResponseWriter, r *http.Request) {
	data := sjs.Worker{}
	if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
		err = errors.Wrap(err, "decode json")
		http.Error(w, err, http.StatusBadRequest)
// addService notifies the job service of a new endpoint and its capabilities.
func (s *Service) addService(w http.ResponseWriter, r *http.Request) {
	worker := &sjs.Worker{}
	if err := json.NewDecoder(r.Body).Decode(worker); err != nil {
		s := fmt.Sprintf("decode json: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	ul, err := url.Parse(w.NotifyURL)
	ul, err := url.Parse(worker.NotifyURL)
	if err != nil {
		err = errors.Wrap(err, "invalid NotifyURL")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("invalid NotifyURL: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	if ul.Scheme == "" || ul.Host == "" || ul.Path == "" {
		err = errors.New("invalid NotifyURL")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("invalid NotifyURL: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	if len(w.Capabilities) == 0 {
		err = errors.New("worker must have capabilities")
		http.Error(w, err, http.StatusBadRequest)
	if len(worker.Capabilities) == 0 {
		s := "worker must have capabilities"
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	s.workers.AddWorker(w)
	s.workers.AddWorker(worker)
	w.WriteHeader(http.StatusOK)
}

func (s *Server) handleGetJobs(w http.ResponseWriter, r *http.Request) {
func (s *Service) handleGetJobs(w http.ResponseWriter, r *http.Request) {
	var head string
	head, r.URL.Path = shiftPath(r.URL.Path) // jobs
	if r.URL.Path == "/" {


@@ 130,18 138,18 @@ func (s *Server) handleGetJobs(w http.ResponseWriter, r *http.Request) {
	head, r.URL.Path = shiftPath(r.URL.Path) // id
	jobID, err := strconv.Atoi(head)
	if err != nil {
		err = errors.Wrap(err, "parse job id")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("parse job id: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	s.getJobForID(w, r, jobID)
	s.getJob(w, r, jobID)
}

func (s *Server) getJobs(w http.ResponseWriter, r *http.Request) {
func (s *Service) getJobs(w http.ResponseWriter, r *http.Request) {
	// TODO - do we need this?
}

func (s *Server) getJob(w http.ResponseWriter, r *http.Request, id int) {
func (s *Service) getJob(w http.ResponseWriter, r *http.Request, id int) {
	ctx := r.Context()
	job, err := s.db.GetJobForID(ctx, id)
	if sjs.IsMissing(err) {


@@ 149,39 157,30 @@ func (s *Server) getJob(w http.ResponseWriter, r *http.Request, id int) {
		return
	}
	if err != nil {
		err = errors.Wrap(err, "get job for id")
		http.Error(w, err, http.StatusInternalServerError)
		s := fmt.Sprintf("get job for id: %s", err.Error())
		http.Error(w, s, http.StatusInternalServerError)
		return
	}
	json.NewEncoder(w).Encode(job)
}

func (s *Server) postJob(w http.ResponseWriter, r *http.Request) {
	data := struct {
		Name              string
		RunEveryInSeconds int
		TimeoutInSeconds  *int
	}{}
	err := json.NewDecoder(r.Body).Decode(&data)
func (s *Service) postJob(w http.ResponseWriter, r *http.Request) {
	job := &sjs.Job{}
	err := json.NewDecoder(r.Body).Decode(job)
	if err != nil {
		err = errors.Wrap(err, "decode job data")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("decode job: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	job := &Job{
		Name:             data.Name,
		RunEvery:         data.RunEveryInSeconds * time.Duration,
		TimeoutInSeconds: data.TimeoutInSeconds * time.Duration,
	}
	if err = job.Valid(); err != nil {
		err = errors.Wrap(err, "invalid job")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("invalid job: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	job.ID, err = s.db.CreateJob(r.Context(), job)
	job.ID, err = s.db.GetOrCreateJob(r.Context(), job)
	if err != nil {
		err = errors.Wrap(err, "create job")
		http.Error(w, err, http.StatusBadRequest)
		s := fmt.Sprintf("create job: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	w.WriteHeader(http.StatusCreated)

M job.go => job.go +81 -20
@@ 1,5 1,13 @@
package sjs

import (
	"context"
	"fmt"
	"time"

	"github.com/pkg/errors"
)

type Job struct {
	ID        int
	CreatedAt time.Time


@@ 12,15 20,31 @@ type Job struct {
	// run, this is nil.
	LastRun *time.Time

	// RunEvery describes the interval on which to run the job.
	RunEvery time.Duration
	// RunEveryInSeconds describes the interval on which to run the job.
	RunEveryInSeconds int

	// TimeoutInSeconds is the max length of time that a specific job's
	// execution is allowed before it's canceled. If nil, the job may run
	// forever.
	TimeoutInSeconds *int

	// JobStatus indicates whether a job is complete, paused, or running.
	JobStatus JobStatus

	// PayloadData included every time sjs notifies a worker.
	PayloadData []byte
}

	// Timeout is the max length of time that a specific job's execution is
	// allowed before it's canceled. If nil, the job may run forever.
	Timeout *time.Duration
// JobResult represents the result of a particular job. Any job will have 1 or
// more JobResults from prior runs.
type JobResult struct {
	JobID     int
	Succeeded bool
	StartedAt time.Time
	EndedAt   time.Time

	Status   JobStatus
	RunCount int
	// ErrMessage is nil if the job succeeded.
	ErrMessage *string
}

type JobName string


@@ 37,38 61,75 @@ const (
// error describing the validation issue.
func (j *Job) Valid() error {
	if j == nil {
		return false, errors.New("Job cannot be nil")
		return errors.New("Job cannot be nil")
	}
	if j.Name == "" {
		return false, errors.New("Name cannot be empty")
		return errors.New("Name cannot be empty")
	}
	if j.RunEvery < 1*time.Second {
		return false, errors.New("RunEvery must be >= 1 second")
	if j.RunEveryInSeconds < 1 {
		return errors.New("RunEveryInSeconds must be >= 1 second")
	}
	return nil
}

// Schedule a job to run in a goroutine.
func Schedule(workerMap map[WorkerCapability][]*Worker, j *Job) {
	go schedule(j)
func Schedule(
	db DataStorage,
	workerMap *WorkerMap,
	j *Job,
	errCh chan<- error,
) {
	go schedule(db, workerMap, j, errCh)
}

func schedule(
	db DataStorage,
	workerMap map[WorkerCapability][]*Worker,
	workerMap *WorkerMap,
	j *Job,
	errCh chan<- error,
) {
	for range time.Tick(j.RunEvery) {
		// Update job in data storage
		run(workerMap, j)
	dur := time.Duration(j.RunEveryInSeconds) * time.Second
	for start := range time.Tick(dur) {
		ctx := context.Background()
		var cancel context.CancelFunc
		if j.TimeoutInSeconds != nil {
			t := time.Duration(*j.TimeoutInSeconds) * time.Second
			ctx, cancel = context.WithTimeout(ctx, t)
		}
		err := run(ctx, workerMap, j)
		result := &JobResult{
			JobID:     j.ID,
			Succeeded: err == nil,
			StartedAt: start,
			EndedAt:   time.Now(),
		}
		if err != nil {
			errCh <- errors.Wrap(err, "run")

			// Update our JobResult
			errMsg := err.Error()
			result.ErrMessage = &errMsg

			// Don't return or continue in this error handling. We
			// want to record the failed job result below and
			// cancel the context to free up resources
		}
		if err = db.CreateJobResult(ctx, result); err != nil {
			errCh <- errors.Wrap(err, "create job result")
		}
		if j.TimeoutInSeconds != nil {
			cancel()
		}
	}
}

// run a job. If no workers are available with that capability, then report an
// error.
func run(wm *workerMap, j *Job) error {
	worker := wm.GetWorkerForJobName(j.Name)
func run(ctx context.Context, m *WorkerMap, j *Job) error {
	worker := m.GetWorkerForJobName(j.Name)
	if worker == nil {
		return fmt.Errorf("no workers capable of %s", j.Name)
	}
	worker := workers[rand.Intn(len(workers))]
	err := worker.Run(ctx, j)
	return errors.Wrap(err, "run job")
}

D logger.go => logger.go +0 -7
@@ 1,7 0,0 @@
package sjs

// Logger logs error messages for the caller where those errors don't require
// returning, i.e. the logging itself constitutes handling the error.
type Logger interface {
	Printf(format string, vals ...interface{})
}

M worker.go => worker.go +84 -9
@@ 1,14 1,28 @@
package sjs

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/pkg/errors"
)

type Worker struct {
	NotifyURL    string
	Capabilities []JobName
	NotifyURL     string
	Capabilities  []JobName
	lastHeartbeat time.Time
}

// WorkerMap maps job names to workers. It is concurrent-safe.
type WorkerMap struct {
	data map[JobName]workerGroup
	mu   sync.Mutex
	workers []*Worker
	data    map[JobName]*workerGroup
	mu      sync.Mutex
}

type workerGroup struct {


@@ 21,16 35,29 @@ type workerGroup struct {
}

func NewWorkerMap() *WorkerMap {
	return &WorkerMap{data: map[JobName]workerGroup{}}
	return &WorkerMap{data: map[JobName]*workerGroup{}}
}

// addWorkerToMap in a non-concurrent way. It's assumed that the caller has
// guarded access.
func (m *WorkerMap) addWorkerToMap(w *Worker) {
	for _, jobName := range w.Capabilities {
		wg, ok := m.data[jobName]
		if !ok {
			wg = &workerGroup{}
			m.data[jobName] = wg
		}
		wg.workers = append(wg.workers, w)
	}
}

func (m *WorkerMap) AddWorker(w *Worker) {
	m.mu.Lock()
	defer m.mu.Unlock()

	for _, jobName := range w.Capabilities {
		m.data[jobName] = append(m.data[jobData], w)
	}
	w.lastHeartbeat = time.Now()
	m.workers = append(m.workers, w)
	m.addWorkerToMap(w)
}

func (m *WorkerMap) GetWorkerForJobName(name JobName) *Worker {


@@ 42,5 69,53 @@ func (m *WorkerMap) GetWorkerForJobName(name JobName) *Worker {
		return nil
	}
	wg.modIdx++
	return wg.workers[workers.modIdx%len(wg.workers)]
	return wg.workers[wg.modIdx%len(wg.workers)]
}

func (m *WorkerMap) PurgeWorkersEvery(dur time.Duration) {
	go func() {
		for t := range time.Tick(dur) {
			m.purgeWorkers(t)
		}
	}()
}

func (m *WorkerMap) purgeWorkers(t time.Time) {
	newWorkers := []*Worker{}
	for _, worker := range m.workers {
		if worker.lastHeartbeat.Add(30 * time.Second).After(t) {
			newWorkers = append(newWorkers, worker)
		}
	}
	m.mu.Lock()
	defer m.mu.Unlock()

	m.workers = newWorkers
	m.data = map[JobName]*workerGroup{}
	for _, w := range m.workers {
		m.addWorkerToMap(w)
	}
}

func (w *Worker) Run(ctx context.Context, j *Job) error {
	byt, err := json.Marshal(j)
	if err != nil {
		return errors.Wrap(err, "marshal job")
	}
	req, err := http.NewRequest("POST", w.NotifyURL, bytes.NewReader(byt))
	if err != nil {
		return errors.Wrap(err, "new request")
	}
	req = req.WithContext(ctx)
	req.Header.Set("Content-Type", "application/json")
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return errors.Wrap(err, "post")
	}
	defer resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("worker responded %d", resp.StatusCode)
	}
	return nil
}