~egtann/sjs

9807ab3093f263882731fb693b6c0b32dd1166b0 — Evan Tann 1 year, 10 months ago af2484b
remove database, keep state in memory
4 files changed, 10 insertions(+), 147 deletions(-)

D data_storage.go
M http/service.go
M job.go
M worker.go
D data_storage.go => data_storage.go +0 -23
@@ 1,23 0,0 @@
package sjs

import "context"

type DataStorage interface {
	// GetJobForID should report a Missing error if a job by that ID does
	// not exist.
	GetJobForID(context.Context, int) (*Job, error)

	// GetOrCreateJob reports the created ID and an error, if any.
	GetOrCreateJob(context.Context, *Job) (int, error)

	// GetJobs reports all jobs regardless of their status.
	GetJobs(context.Context) ([]*Job, error)

	// GetActiveJobs reports all incomplete, non-paused jobs.
	GetActiveJobs(context.Context) ([]*Job, error)

	UpdateJob(context.Context, *Job) error

	// CreateJobResult tracks successes and failures.
	CreateJobResult(context.Context, *JobResult) error
}

M http/service.go => http/service.go +2 -80
@@ 1,14 1,12 @@
package http

import (
	"context"
	"crypto/subtle"
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"path"
	"strconv"
	"strings"
	"time"



@@ 20,7 18,6 @@ import (
type Service struct {
	Mux *http.ServeMux

	db         sjs.DataStorage
	workerData *sjs.WorkerMap
	log        *sjs.OptLogger
	sjsURL     string


@@ 31,12 28,8 @@ type Service struct {
// NewService prepares the endpoints and starts the jobs. The error channel is
// optional; if the channel is not nil, the server will send errors encountered
// when running jobs. Jobs are distributed to workers round-robin.
func NewService(
	db sjs.DataStorage,
	apiKey string,
) (*Service, error) {
func NewService(apiKey string) (*Service, error) {
	srv := &Service{
		db:         db,
		apiKey:     apiKey,
		workerData: sjs.NewWorkerMap(),
		errCh:      &sjs.OptErr{},


@@ 50,12 43,8 @@ func NewService(
	)
	mux := http.NewServeMux()
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		if srv.log != nil {
			srv.log.Printf("GET /health")
		}
		w.Write([]byte("OK"))
	})
	mux.Handle("/jobs", chain.Then(http.HandlerFunc(srv.handleJobs)))
	mux.Handle("/workers", chain.Then(http.HandlerFunc(srv.handleWorkers)))
	srv.Mux = mux



@@ 95,15 84,6 @@ func (s *Service) Err() <-chan error {
	return s.errCh.C
}

func (s *Service) handleJobs(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "GET":
		s.handleGetJobs(w, r)
	default:
		http.NotFound(w, r)
	}
}

func (s *Service) handleWorkers(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "GET":


@@ 151,14 131,7 @@ func (s *Service) addWorker(w http.ResponseWriter, r *http.Request) {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}
		job.ID, err = s.db.GetOrCreateJob(r.Context(), job)
		if err != nil {
			s := fmt.Sprintf("create job %s: %s", job.Name, err.Error())
			http.Error(w, s, http.StatusBadRequest)
			return
		}
		worker.Jobs = append(worker.Jobs, job)
		s.log.Printf("added job %s", job.Name)
	}
	if len(worker.Jobs) == 0 {
		s := "worker must have jobs. call WithJobs() on client."


@@ 166,61 139,10 @@ func (s *Service) addWorker(w http.ResponseWriter, r *http.Request) {
		return
	}
	worker.APIKey = s.apiKey
	s.workerData.AddWorker(r.Context(), s.db, worker, s.errCh)
	s.workerData.AddWorker(r.Context(), s.log, worker, s.errCh)
	w.WriteHeader(http.StatusOK)
}

func (s *Service) handleGetJobs(w http.ResponseWriter, r *http.Request) {
	var head string
	head, r.URL.Path = shiftPath(r.URL.Path) // jobs
	if r.URL.Path == "/" {
		s.getJobs(w, r)
		return
	}
	head, r.URL.Path = shiftPath(r.URL.Path) // id
	jobID, err := strconv.Atoi(head)
	if err != nil {
		s := fmt.Sprintf("parse job id: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
	}
	s.getJob(w, r, jobID)
}

func (s *Service) getJobsData(ctx context.Context) ([]byte, error) {
	jobs, err := s.db.GetJobs(ctx)
	if err != nil {
		return nil, errors.Wrap(err, "get jobs")
	}
	byt, err := json.Marshal(jobs)
	return byt, errors.Wrap(err, "marshal jobs")
}

func (s *Service) getJobs(w http.ResponseWriter, r *http.Request) {
	byt, err := s.getJobsData(r.Context())
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Write(byt)
}

func (s *Service) getJob(w http.ResponseWriter, r *http.Request, id int) {
	s.log.Printf("retrieving job %d", id)
	ctx := r.Context()
	job, err := s.db.GetJobForID(ctx, id)
	if sjs.IsMissing(err) {
		http.NotFound(w, r)
		return
	}
	if err != nil {
		s := fmt.Sprintf("get job for id: %s", err.Error())
		http.Error(w, s, http.StatusInternalServerError)
		return
	}
	json.NewEncoder(w).Encode(job)
}

func setJSONContentType(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")

M job.go => job.go +5 -42
@@ 107,20 107,15 @@ func (j *Job) Valid() error {
// Schedule a job to run.
func Schedule(
	ctx context.Context,
	db DataStorage,
	workerMap *WorkerMap,
	j *Job,
	errCh *OptErr,
) {
	// Since the job is new, we update the job in the database.
	err := db.UpdateJob(ctx, j)
	if err != nil {
		errCh.Send(errors.Wrap(err, "updating job"))
		// Keep going; we can still run the timer correctly even if
		// updating the job failed
	}
	// Run the job once immediately. Jobs that require a specific day of
	// the month won't run unless on that day.
	jobTick(workerMap, j, time.Now(), errCh)

	// Now we have a timer that we need to listen to
	// And set up a ticker to continue running in the background
	go func() {
		for {
			workerMap.mu.RLock()


@@ 129,7 124,7 @@ func Schedule(

			select {
			case start := <-wg.ticker.C:
				jobTick(db, workerMap, j, start, errCh)
				jobTick(workerMap, j, start, errCh)
			case <-wg.doneCh:
				wg.ticker.Stop()
				return


@@ 139,7 134,6 @@ func Schedule(
}

func jobTick(
	db DataStorage,
	workerMap *WorkerMap,
	j *Job,
	start time.Time,


@@ 154,37 148,6 @@ func jobTick(
		errCh.Send(errors.Wrap(err, "schedule"))
		// Keep going; we want to record the job result.
	}
	err = recordJobResult(db, j, start, err)
	if err != nil {
		errCh.Send(errors.Wrap(err, "record"))
		return
	}
}

func recordJobResult(
	db DataStorage,
	j *Job,
	start time.Time,
	err error,
) error {
	result := &JobResult{
		JobID:     j.ID,
		Succeeded: err == nil,
		StartedAt: start,
		EndedAt:   time.Now(),
	}
	if err != nil {
		// Update our JobResult
		errMsg := err.Error()
		result.ErrMessage = &errMsg

		// Don't return in this error handling. We want to record the
		// failed job result below
	}
	if err = db.CreateJobResult(context.Background(), result); err != nil {
		return errors.Wrap(err, "create job result")
	}
	return nil
}

func scheduleJobWithTimeout(workerMap *WorkerMap, j *Job) error {

M worker.go => worker.go +3 -2
@@ 61,7 61,7 @@ func NewWorkerMap() *WorkerMap {

func (m *WorkerMap) AddWorker(
	ctx context.Context,
	db DataStorage,
	lg *OptLogger,
	w *Worker,
	errCh *OptErr,
) {


@@ 83,7 83,8 @@ func (m *WorkerMap) AddWorker(
		wg := newWorkerGroup(j)
		wg.workers = append(wg.workers, w)
		m.data[j.Name] = wg
		Schedule(ctx, db, m, j, errCh)
		Schedule(ctx, m, j, errCh)
		lg.Printf("added job %s (run every %d %s)", j.Name, j.RunEvery, j.RunEveryPeriod)
	}
}