~egtann/sjs

c9f368d1c08e7c5ba938ac3c964845c7b5f574cd — Evan Tann 4 months ago 5662763 master
adopt go1.13 stdlib errors
7 files changed, 36 insertions(+), 48 deletions(-)

M error.go
M go.mod
M go.sum
M http/client.go
M http/service.go
M job.go
M worker.go
M error.go => error.go +1 -15
@@ 1,10 1,6 @@
package sjs

import (
	"sync"

	"github.com/pkg/errors"
)
import "sync"

// OptErr is a channel that we can modify in a threadsafe way. The mutex is
// required to read or modify the underlying channel, which can be created at


@@ 25,13 21,3 @@ func (o *OptErr) Send(err error) {
	}
	o.C <- err
}

type Missing error

func IsMissing(err error) bool {
	switch errors.Cause(err).(type) {
	case Missing:
		return true
	}
	return false
}

M go.mod => go.mod +1 -2
@@ 2,8 2,7 @@ module egt.run/sjs

require (
	github.com/hashicorp/go-cleanhttp v0.5.1
	github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da
	github.com/pkg/errors v0.8.1
	github.com/justinas/alice v1.2.0
)

go 1.13

M go.sum => go.sum +2 -6
@@ 1,8 1,4 @@
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da h1:5y58+OCjoHCYB8182mpf/dEsq0vwTKPOo4zGfH0xW9A=
github.com/justinas/alice v0.0.0-20171023064455-03f45bd4b7da/go.mod h1:oLH0CmIaxCGXD67VKGR5AacGXZSMznlmeqM8RzPrcY8=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo=
github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA=

M http/client.go => http/client.go +6 -7
@@ 10,7 10,6 @@ import (

	"egt.run/sjs"
	"github.com/hashicorp/go-cleanhttp"
	"github.com/pkg/errors"
)

// Client communicates with SJS server through client.Notify, which continually


@@ 72,12 71,12 @@ func (c *Client) Heartbeat() {
	go func() {
		err := c.notify()
		if err != nil {
			c.errCh.Send(errors.Wrap(err, "notify"))
			c.errCh.Send(fmt.Errorf("notify: %w", err))
		}
		for range time.Tick(15 * time.Second) {
			err = c.notify()
			if err != nil {
				c.errCh.Send(errors.Wrap(err, "notify"))
				c.errCh.Send(fmt.Errorf("notify: %w", err))
			}
		}
	}()


@@ 90,25 89,25 @@ func (c *Client) notify() error {
	}
	body, err := json.Marshal(heartbeat)
	if err != nil {
		return errors.Wrap(err, "marshal worker")
		return fmt.Errorf("marshal worker: %w", err)
	}
	rdr := bytes.NewReader(body)
	req, err := http.NewRequest("POST", c.serverURL+"/workers", rdr)
	if err != nil {
		return errors.Wrap(err, "new request")
		return fmt.Errorf("new request: %w", err)
	}
	req.Header.Set("X-API-Key", c.apiKey)
	req.Header.Set("X-Host", c.host)
	req.Header.Set("X-Role", c.role)
	resp, err := c.client.Do(req)
	if err != nil {
		return errors.Wrap(err, "do")
		return fmt.Errorf("do: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		byt, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			return errors.Wrap(err, "read resp body")
			return fmt.Errorf("read resp body: %w", err)
		}
		return fmt.Errorf("%s responded with %d: %s", c.serverURL,
			resp.StatusCode, string(byt))

M http/service.go => http/service.go +6 -4
@@ 3,6 3,8 @@ package http
import (
	"crypto/subtle"
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
	"net/url"
	"path"


@@ 11,7 13,6 @@ import (

	"egt.run/sjs"
	"github.com/justinas/alice"
	"github.com/pkg/errors"
)

type Service struct {


@@ 110,17 111,18 @@ func (s *Service) addWorkerData(r *http.Request) error {
	host := r.Header.Get("X-Host")
	heartbeat := sjs.Heartbeat{}
	if err := json.NewDecoder(r.Body).Decode(&heartbeat); err != nil {
		return errors.Wrapf(err, "bad heartbeat (%s@%s): decode json", role, host)
		return fmt.Errorf("bad heartbeat (%s@%s): decode json: %w",
			role, host, err)
	}
	worker := s.workerData.GetOrCreateWorkerForNotifyURL(heartbeat.NotifyURL)
	_, err := url.Parse(worker.NotifyURL)
	if err != nil {
		return errors.Wrap(err, "invalid NotifyURL")
		return fmt.Errorf("invalid NotifyURL: %w", err)
	}
	for _, jd := range heartbeat.Jobs {
		job, err := sjs.JobFromData(jd)
		if err != nil {
			return errors.Wrap(err, "bad heartbeat: bad job: job from data")
			return fmt.Errorf("bad heartbeat: bad job: job from data: %w", err)
		}
		worker.Jobs = append(worker.Jobs, job)
	}

M job.go => job.go +14 -9
@@ 2,11 2,10 @@ package sjs

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/pkg/errors"
)

type Job struct {


@@ 145,7 144,7 @@ func jobTick(
	}
	err := scheduleJobWithTimeout(workerMap, j)
	if err != nil {
		errCh.Send(errors.Wrap(err, "schedule"))
		errCh.Send(fmt.Errorf("schedule: %w", err))
	}
}



@@ 159,8 158,10 @@ func scheduleJobWithTimeout(workerMap *WorkerMap, j *Job) error {
		ctx, cancel = context.WithTimeout(ctx, t)
	}
	defer cancel()
	err := run(ctx, workerMap, j)
	return errors.Wrap(err, "run")
	if err := run(ctx, workerMap, j); err != nil {
		return fmt.Errorf("run: %w", err)
	}
	return nil
}

// run a job. If no workers are available with that capability, then report an


@@ 173,8 174,10 @@ func run(ctx context.Context, m *WorkerMap, j *Job) error {
	if worker == nil {
		return fmt.Errorf("no workers capable of %s", j.Name)
	}
	err := worker.Run(ctx, j)
	return errors.Wrap(err, "run job")
	if err := worker.Run(ctx, j); err != nil {
		return fmt.Errorf("run job: %w", err)
	}
	return nil
}

// JobFromData converts JobData to a job and validates the job, reporting any


@@ 193,8 196,10 @@ func JobFromData(jd *JobData) (*Job, error) {
		JobStatus:        JobStatusRunning,
		TimeoutInSeconds: timeoutSecs,
	}
	err := job.Valid()
	return job, errors.Wrapf(err, "invalid job %s", job.Name)
	if err := job.Valid(); err != nil {
		return nil, fmt.Errorf("invalid job %s: %w", job.Name, err)
	}
	return job, nil
}

func jobDuration(j *Job) time.Duration {

M worker.go => worker.go +6 -5
@@ 9,7 9,7 @@ import (
	"sync"
	"time"

	"github.com/pkg/errors"
	"github.com/hashicorp/go-cleanhttp"
)

type Worker struct {


@@ 184,19 184,20 @@ func (m *WorkerMap) purgeWorkers(t time.Time) {
func (w *Worker) Run(ctx context.Context, j *Job) error {
	byt, err := json.Marshal(j)
	if err != nil {
		return errors.Wrap(err, "marshal job")
		return fmt.Errorf("marshal job: %w", err)
	}
	req, err := http.NewRequest("POST", w.NotifyURL, bytes.NewReader(byt))
	if err != nil {
		return errors.Wrap(err, "new request")
		return fmt.Errorf("new request: %w", err)
	}
	req = req.WithContext(ctx)
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-API-Key", w.APIKey)
	client := &http.Client{Timeout: 10 * time.Second}
	client := cleanhttp.DefaultClient()
	client.Timeout = 10 * time.Second
	resp, err := client.Do(req)
	if err != nil {
		return errors.Wrap(err, "post")
		return fmt.Errorf("post: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {