~egtann/sjs

8b4fdaaabd731b264d48614d368afb7056416490 — Evan Tann 1 year, 9 months ago 607b1fe
fix deadlock, simplify service error handling
3 files changed, 29 insertions(+), 42 deletions(-)

M http/client.go
M http/service.go
M worker.go
M http/client.go => http/client.go +2 -1
@@ 27,11 27,12 @@ type Client struct {
}

func NewClient(selfURL, sjsURL, apiKey, host, role string) *Client {
	client := &http.Client{Timeout: 10 * time.Second}
	return &Client{
		selfURL:   selfURL,
		serverURL: sjsURL,
		apiKey:    apiKey,
		client:    &http.Client{Timeout: 10 * time.Second},
		client:    client,
		errCh:     &sjs.OptErr{},
		host:      host,
		role:      role,

M http/service.go => http/service.go +18 -39
@@ 3,7 3,6 @@ package http
import (
	"crypto/subtle"
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"path"


@@ 45,7 44,7 @@ func NewService(apiKey string) (*Service, error) {
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("OK"))
	})
	mux.Handle("/workers", chain.Then(http.HandlerFunc(srv.handleWorkers)))
	mux.Handle("/workers", chain.Then(http.HandlerFunc(srv.addWorker)))
	srv.Mux = mux

	// Remove workers from rotation when they stop sending heartbeats.


@@ 72,7 71,6 @@ func (s *Service) WithLogger(log sjs.Logger) *Service {
// Err is a convenience wrapper for handling errors.
func (s *Service) Err() <-chan error {
	s.errCh.RLock()

	if s.errCh.C == nil {
		s.errCh.RUnlock()
		s.errCh.Lock()


@@ 84,67 82,48 @@ func (s *Service) Err() <-chan error {
	return s.errCh.C
}

func (s *Service) handleWorkers(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "GET":
		s.getWorkers(w, r)
	case "POST":
		s.addWorker(w, r)
	default:
// addWorker notifies the job service of a new worker and its capabilities.
// This is the heartbeat URL. Combining the two prevents us from needing to
// maintain worker state. If this service goes down and is restarted, workers
// are added again in seconds automatically due to this heartbeat.
func (s *Service) addWorker(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.NotFound(w, r)
		return
	}
}

func (s *Service) getWorkers(w http.ResponseWriter, r *http.Request) {
	byt, err := json.Marshal(s.workerData.Workers())
	if err != nil {
		err = errors.Wrap(err, "marshal workers")
		http.Error(w, err.Error(), http.StatusInternalServerError)
	if err := s.addWorkerData(r); err != nil {
		s.log.Printf("%s", err)
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}
	w.Write(byt)
	w.WriteHeader(http.StatusOK)
}

// addWorker notifies the job service of a new worker and its capabilities.
// This is the heartbeat URL. Combining the two prevents us from needing to
// maintain worker state. If this service goes down and is restarted, workers
// are added again in seconds automatically due to this heartbeat.
func (s *Service) addWorker(w http.ResponseWriter, r *http.Request) {
func (s *Service) addWorkerData(r *http.Request) error {
	role := r.Header.Get("X-Role")
	host := r.Header.Get("X-Host")
	heartbeat := sjs.Heartbeat{}
	if err := json.NewDecoder(r.Body).Decode(&heartbeat); err != nil {
		msg := fmt.Sprintf("%s %s: decode json: %s", role, host, err.Error())
		s.log.Printf("bad heartbeat: %s", msg)
		http.Error(w, msg, http.StatusBadRequest)
		return
		return errors.Wrapf(err, "bad heartbeat (%s@%s): decode json", role, host)
	}
	worker := s.workerData.GetOrCreateWorkerForNotifyURL(heartbeat.NotifyURL)
	_, err := url.Parse(worker.NotifyURL)
	if err != nil {
		s := fmt.Sprintf("invalid NotifyURL: %s", err.Error())
		http.Error(w, s, http.StatusBadRequest)
		return
		return errors.Wrap(err, "invalid NotifyURL")
	}
	for _, jd := range heartbeat.Jobs {
		job, err := sjs.JobFromData(jd)
		if err != nil {
			err = errors.Wrap(err, "job from data")
			http.Error(w, err.Error(), http.StatusBadRequest)
			s.log.Printf("bad heartbeat: bad job: %s", err.Error())
			return
			return errors.Wrap(err, "bad heartbeat: bad job: job from data")
		}
		worker.Jobs = append(worker.Jobs, job)
	}
	if len(worker.Jobs) == 0 {
		msg := "worker must have jobs. call WithJobs() on client."
		s.log.Printf("bad heartbeat: no jobs: %+v", worker)
		http.Error(w, msg, http.StatusBadRequest)
		return
		return errors.New("worker must have jobs. call WithJobs() on client.")
	}
	worker.APIKey = s.apiKey
	s.workerData.AddWorker(r.Context(), s.log, worker, s.errCh)
	w.WriteHeader(http.StatusOK)
	return nil
}

func setJSONContentType(next http.Handler) http.Handler {

M worker.go => worker.go +9 -2
@@ 66,23 66,30 @@ func (m *WorkerMap) AddWorker(
	errCh *OptErr,
) {
	m.mu.Lock()
	defer m.mu.Unlock()

	w.lastHeartbeat = time.Now()
	if _, exist := m.set[w.NotifyURL]; exist {
		// We already have this worker. Nothing else to do
		m.mu.Unlock()
		return
	}
	m.set[w.NotifyURL] = w
	m.mu.Unlock()

	for _, j := range w.Jobs {
		m.mu.Lock()
		if _, exist := m.data[j.Name]; exist {
			// There's nothing else we need to do. A ticker is
			// already running.
			m.mu.Unlock()
			return
		}
		wg := newWorkerGroup(j)
		wg.workers = append(wg.workers, w)
		m.data[j.Name] = wg

		// Unlock ahead of Schedule, which acquires its own lock on a
		// timed loop
		m.mu.Unlock()
		Schedule(ctx, m, j, errCh)
		lg.Printf("added job %s (run every %d %s)", j.Name, j.RunEvery, j.RunEveryPeriod)
	}