~shulhan/karajo

d7c6a414bcf5fe58952fb6494c11cb227da4151a — Shulhan 7 months ago ddfbf77
all: changes on how the job queued using channel

Previously, a job run using the following flow:

* interval/scheduler timer kicking in
* send the job finish to finished channel

If the job triggered from HTTP request, it will run on its own goroutine.

This changes add third channel, startq, to JobBase that queue the Job.
When the timer kicking in or HTTP request received in it will pushed
to startq.
The startq execute the job and signal the completed job using finishq.
M client.go => client.go +3 -4
@@ 188,11 188,10 @@ func (cl *Client) JobRun(jobPath string) (job *Job, err error) {
	if err != nil {
		return nil, fmt.Errorf(`%s: %w`, logp, err)
	}
	if res.Code == 200 {
		return job, nil
	if res.Code >= 400 {
		return nil, res
	}
	res.Data = nil
	return nil, res
	return job, nil
}

// JobLog get the Job log by its ID and counter.

M go.mod => go.mod +1 -1
@@ 4,7 4,7 @@ go 1.19

require (
	git.sr.ht/~shulhan/ciigo v0.9.3
	github.com/shuLhan/share v0.44.0
	github.com/shuLhan/share v0.46.0
)

require (

M go.sum => go.sum +2 -2
@@ 2,8 2,8 @@ git.sr.ht/~shulhan/asciidoctor-go v0.4.1 h1:Zev0L5HyMjH43sPaoJal8E/Hmbel/akoGOxN
git.sr.ht/~shulhan/asciidoctor-go v0.4.1/go.mod h1:vRHDUl3o3UzDkvVR9dEFYQ0JDqOh0TKpOZWvOh/CGZU=
git.sr.ht/~shulhan/ciigo v0.9.3 h1:q6EqGVvIU8ymkPqBS4HEyHIhbfVhJn6urwvGDg83TAY=
git.sr.ht/~shulhan/ciigo v0.9.3/go.mod h1:SsRnbqnBo+9jWDDZD/uc9IkdgGpBfp39vV0JPXNro9c=
github.com/shuLhan/share v0.44.0 h1:Afom8pQrzNYtUZM53y+eqlZw5lkFm7bgl3QjZ3ARsgg=
github.com/shuLhan/share v0.44.0/go.mod h1:BnjohSsgDFMeYQ0/ws7kzb1oABZdVbEwDNGbUhOLee4=
github.com/shuLhan/share v0.46.0 h1:cF0Ngj7wVA6TIcdSmfrqxOwMB3hZ+4df5cJf4GGCun4=
github.com/shuLhan/share v0.46.0/go.mod h1:BhnIWJxq84BTOs3Z2gLFAN8ih9mBfhZbRIjqGupGJag=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=

M job.go => job.go +84 -51
@@ 404,6 404,12 @@ func (job *Job) initTimer() (err error) {
		if job.Interval < time.Minute {
			job.Interval = time.Minute
		}

		var (
			now          = TimeNow().UTC().Round(time.Second)
			nextInterval = job.computeNextInterval(now)
		)
		job.NextRun = now.Add(nextInterval)
	}
	return nil
}


@@ 431,9 437,7 @@ func (job *Job) logsPrune() {
// Once the signature is verified it will response immediately and run the
// actual process in the new goroutine.
func (job *Job) handleHttp(epr *libhttp.EndpointRequest) (resbody []byte, err error) {
	var (
		logp = `handleHttp`
	)
	var logp = `handleHttp`

	// Authenticated request by checking the request body.
	err = job.authorize(epr.HttpRequest.Header, epr.RequestBody)


@@ 441,28 445,14 @@ func (job *Job) handleHttp(epr *libhttp.EndpointRequest) (resbody []byte, err er
		return nil, fmt.Errorf(`%s: %s: %w`, logp, job.ID, err)
	}

	err = job.start()
	err = job.canStart()
	if err != nil {
		return nil, fmt.Errorf(`%s: %s: %w`, logp, job.ID, err)
	}

	go func() {
		var (
			jlog *JobLog
			err  error
		)
		jlog, err = job.execute(epr)
		if err != nil {
			mlog.Errf(`!!! job: %s: failed: %s.`, job.ID, err)
		} else {
			mlog.Outf(`job: %s: finished.`, job.ID)
		}
		job.finish(jlog, err)
	}()

	var res libhttp.EndpointResponse

	res.Code = http.StatusOK
	res.Code = http.StatusAccepted
	res.Message = `OK`
	res.Data = job



@@ 470,6 460,10 @@ func (job *Job) handleHttp(epr *libhttp.EndpointRequest) (resbody []byte, err er
	resbody, err = json.Marshal(&res)
	job.Unlock()

	go func() {
		job.startq <- struct{}{}
	}()

	return resbody, err
}



@@ 481,23 475,62 @@ func (job *Job) Start() {
	}
	if job.Interval > 0 {
		job.startInterval()
		return
	}
	job.startQueue()
}

// startQueue start Job queue that triggered only by HTTP request.
func (job *Job) startQueue() {
	var (
		jlog *JobLog
		err  error
	)

	for {
		select {
		case <-job.startq:
			err = job.start()
			if err != nil {
				mlog.Errf(`!!! job: %s: %s`, job.ID, err)
				continue
			}

			jlog, err = job.execute(nil)
			if err != nil {
				mlog.Errf(`!!! job: %s: failed: %s.`, job.ID, err)
			} else {
				mlog.Outf(`job: %s: finished.`, job.ID)
			}
			job.finish(jlog, err)

			select {
			case job.finishq <- struct{}{}:
			default:
			}

		case <-job.stopq:
			return
		}
	}
}

func (job *Job) startScheduler() {
	var (
		jlog *JobLog
		err  error
	)

	for {
		select {
		case <-job.scheduler.C:
			var (
				jlog *JobLog
				err  error
			)
			job.startq <- struct{}{}

		case <-job.startq:
			err = job.start()
			if err != nil {
				mlog.Errf(`!!! job: %s: %s`, job.ID, err)
				job.scheduler.Stop()
				return
				continue
			}

			jlog, err = job.execute(nil)


@@ 507,19 540,13 @@ func (job *Job) startScheduler() {
				mlog.Outf(`job: %s: finished.`, job.ID)
			}
			job.finish(jlog, err)
			// The finish will trigger the finished channel.

		case <-job.finished:
			go func() {
				// Make sure the Next has been updated on
				// scheduler.
				time.Sleep(time.Second)
				job.Lock()
				job.NextRun = job.scheduler.Next()
				job.Unlock()
			}()

		case <-job.stopped:

			select {
			case job.finishq <- struct{}{}:
			default:
			}

		case <-job.stopq:
			job.scheduler.Stop()
			return
		}


@@ 543,13 570,16 @@ func (job *Job) startInterval() {
		job.NextRun = now.Add(nextInterval)
		job.Unlock()

		mlog.Outf(`job: %s: next running in %s ...`, job.ID, nextInterval)
		mlog.Outf(`job: %s: next running in %s.`, job.ID, nextInterval)

		timer = time.NewTimer(nextInterval)
		ever = true
		for ever {
			select {
			case <-timer.C:
				job.startq <- struct{}{}

			case <-job.startq:
				err = job.start()
				if err != nil {
					mlog.Errf(`!!! job: %s: %s`, job.ID, err)


@@ 565,13 595,16 @@ func (job *Job) startInterval() {
					mlog.Outf(`job: %s: finished.`, job.ID)
				}
				job.finish(jlog, err)
				// The finish will trigger the finished channel.

			case <-job.finished:
				timer.Stop()
				ever = false

			case <-job.stopped:
				select {
				case job.finishq <- struct{}{}:
				default:
				}

			case <-job.stopq:
				timer.Stop()
				return
			}


@@ 581,14 614,6 @@ func (job *Job) startInterval() {

// execute the job Call or commands.
func (job *Job) execute(epr *libhttp.EndpointRequest) (jlog *JobLog, err error) {
	var (
		now     time.Time
		execCmd exec.Cmd
		logTime string
		cmd     string
		x       int
	)

	job.env.jobq <- struct{}{}
	mlog.Outf(`job: %s: started ...`, job.ID)
	defer func() {


@@ 611,6 636,14 @@ func (job *Job) execute(epr *libhttp.EndpointRequest) (jlog *JobLog, err error) 
		return jlog, err
	}

	var (
		now     time.Time
		execCmd exec.Cmd
		logTime string
		cmd     string
		x       int
	)

	// Run commands.
	for x, cmd = range job.Commands {
		now = TimeNow().UTC().Round(time.Second)


@@ 640,7 673,7 @@ func (job *Job) Stop() {
	mlog.Outf(`job: %s: stopping ...`, job.ID)

	select {
	case job.stopped <- true:
	case job.stopq <- struct{}{}:
	default:
	}
}

M job_base.go => job_base.go +29 -21
@@ 31,8 31,9 @@ const DefaultJobMaxRunning = 1
type JobBase struct {
	scheduler *libtime.Scheduler

	finished chan bool
	stopped  chan bool
	finishq chan struct{}
	startq  chan struct{}
	stopq   chan struct{}

	// The last time the job is finished running, in UTC.
	LastRun time.Time `ini:"-" json:"last_run,omitempty"`


@@ 84,14 85,30 @@ type JobBase struct {
}

func (job *JobBase) init() {
	job.finished = make(chan bool, 1)
	job.stopped = make(chan bool, 1)
	job.finishq = make(chan struct{}, 1)
	job.startq = make(chan struct{}, 1)
	job.stopq = make(chan struct{}, 1)

	if job.MaxRunning == 0 {
		job.MaxRunning = DefaultJobMaxRunning
	}
}

// canStart check if the job can be started or return an error if its paused
// or reached maximum running.
func (job *JobBase) canStart() (err error) {
	job.Lock()
	defer job.Unlock()

	if job.Status == JobStatusPaused {
		return ErrJobPaused
	}
	if job.NumRunning+1 > job.MaxRunning {
		return ErrJobMaxReached
	}
	return nil
}

// start check if the job can run, the job is not paused and has not reach
// maximum run.
// If its can run, the status changes to `started` and its NumRunning


@@ 101,21 118,15 @@ func (job *JobBase) init() {
// ErrJobPaused.
// if the max running has reached it will return ErrJobMaxReached.
func (job *JobBase) start() (err error) {
	job.Lock()
	defer job.Unlock()

	if job.Status == JobStatusPaused {
		// Always set the LastRun to the current time, otherwise it
		// will run with 0s duration for interval based job.
		job.LastRun = TimeNow().UTC().Round(time.Second)
		return ErrJobPaused
	}
	if job.NumRunning+1 > job.MaxRunning {
		return ErrJobMaxReached
	err = job.canStart()
	if err != nil {
		return err
	}

	job.Lock()
	job.NumRunning++
	job.Status = JobStatusStarted
	job.Unlock()

	return nil
}


@@ 146,14 157,11 @@ func (job *JobBase) finish(jlog *JobLog, err error) {

	job.NumRunning--
	job.LastRun = TimeNow().UTC().Round(time.Second)
	if job.Interval > 0 {
	if job.scheduler != nil {
		job.NextRun = job.scheduler.Next()
	} else if job.Interval > 0 {
		job.NextRun = job.LastRun.Add(job.Interval)
	}

	select {
	case job.finished <- true:
	default:
	}
}

// computeNextInterval compute the duration when the job will be running based

M job_http.go => job_http.go +22 -22
@@ 129,18 129,18 @@ func (job *JobHttp) Start() {
}

func (job *JobHttp) startScheduler() {
	var err error

	for {
		select {
		case <-job.scheduler.C:
			var (
				err error
			)
			job.startq <- struct{}{}

		case <-job.startq:
			err = job.start()
			if err != nil {
				job.mlog.Errf(`!!! job_http: %s: %s`, job.ID, err)
				job.scheduler.Stop()
				return
				continue
			}

			_, err = job.execute()


@@ 150,19 150,13 @@ func (job *JobHttp) startScheduler() {
				job.mlog.Outf(`job_http: %s: finished.`, job.ID)
			}
			job.finish(nil, err)
			// The finish will trigger the finished channel.

		case <-job.finished:
			go func() {
				// Make sure the Next has been updated on
				// scheduler.
				time.Sleep(time.Second)
				job.Lock()
				job.NextRun = job.scheduler.Next()
				job.Unlock()
			}()

		case <-job.stopped:

			select {
			case job.finishq <- struct{}{}:
			default:
			}

		case <-job.stopq:
			job.scheduler.Stop()
			return
		}


@@ 192,6 186,9 @@ func (job *JobHttp) startInterval() {
		for ever {
			select {
			case <-timer.C:
				job.startq <- struct{}{}

			case <-job.startq:
				err = job.start()
				if err != nil {
					job.mlog.Errf(`!!! %s`, err)


@@ 207,13 204,16 @@ func (job *JobHttp) startInterval() {
					job.mlog.Outf(`finished`)
				}
				job.finish(nil, err)
				// The finish will trigger the finished channel.

			case <-job.finished:
				timer.Stop()
				ever = false

			case <-job.stopped:
				select {
				case job.finishq <- struct{}{}:
				default:
				}

			case <-job.stopq:
				timer.Stop()
				return
			}


@@ 224,7 224,7 @@ func (job *JobHttp) startInterval() {
// Stop the job.
func (job *JobHttp) Stop() {
	job.mlog.Outf(`stopping HTTP job ...`)
	job.stopped <- true
	job.stopq <- struct{}{}

	job.mlog.Flush()
	var err error = job.flog.Close()

M job_test.go => job_test.go +10 -5
@@ 203,7 203,9 @@ func TestJob_handleHttp(t *testing.T) {
		}
		job = Job{
			JobBase: JobBase{
				Name: `Test job handle HTTP`,
				Name:     `Test job handle HTTP`,
				Interval: 60 * time.Minute,
				LastRun:  TimeNow().Add(-30 * time.Minute),
			},
			Path:   `/test-job-handle-http`,
			Secret: `s3cret`,


@@ 232,9 234,12 @@ func TestJob_handleHttp(t *testing.T) {
		t.Fatal(err)
	}

	go job.Start()
	defer job.Stop()

	var (
		jobReq = JobHttpRequest{
			Epoch: testTimeNow.Unix(),
			Epoch: TimeNow().Unix(),
		}
		epr = libhttp.EndpointRequest{
			HttpRequest: &http.Request{


@@ 272,7 277,7 @@ func TestJob_handleHttp(t *testing.T) {
	exp = tdata.Output[`handleHttp_response.json`]
	test.Assert(t, `handleHttp_response`, string(exp), string(got))

	<-job.finished
	<-job.finishq

	job.Lock()
	got, err = json.MarshalIndent(&job, ``, `  `)


@@ 283,7 288,7 @@ func TestJob_handleHttp(t *testing.T) {
	job.Unlock()

	exp = tdata.Output[`job_after.json`]
	test.Assert(t, `TestJob_Call`, string(exp), string(got))
	test.Assert(t, `TestJob_handleHttp`, string(exp), string(got))
}

// TestJob_Start test Job's Call with timer.


@@ 331,7 336,7 @@ func TestJob_Start(t *testing.T) {
	go job.Start()
	defer job.Stop()

	<-job.finished
	<-job.finishq

	job.Lock()
	got, err = json.MarshalIndent(&job, ``, `  `)

M karajo.go => karajo.go +5 -4
@@ 462,14 462,15 @@ func (k *Karajo) apiJobPause(epr *libhttp.EndpointRequest) (resb []byte, err err

	job.pause()

	job.Lock()
	defer job.Unlock()

	res = &libhttp.EndpointResponse{}
	res.Code = http.StatusOK
	res.Data = job

	return json.Marshal(res)
	job.Lock()
	resb, err = json.Marshal(res)
	job.Unlock()

	return resb, err
}

// apiJobResume resume the paused Job.

M karajo_test.go => karajo_test.go +3 -1
@@ 103,6 103,7 @@ func TestKarajo_apis(t *testing.T) {
	t.Run(`apiJobRun_success`, func(tt *testing.T) {
		testKarajo_apiJobRun_success(tt, tdata, testClient)
	})

	t.Run(`apiJobRun_notfound`, func(tt *testing.T) {
		testKarajo_apiJobRun_notfound(tt, tdata, testClient)
	})


@@ 233,6 234,7 @@ func testKarajo_apiJobRun_success(t *testing.T, tdata *test.Data, cl *Client) {
	if err != nil {
		t.Fatal(err)
	}

	test.Assert(t, `apiJobRun_success`, string(exp), string(got))
}



@@ 283,7 285,7 @@ func testKarajo_apiJobLog(t *testing.T, tdata *test.Data, cl *Client) {
		t.Fatal(err)
	}

	test.Assert(t, `apiJobLog`, string(exp), string(got))
	test.Assert(t, `apiJobLog.json`, string(exp), string(got))
}

func testKarajo_apiJobResume(t *testing.T, tdata *test.Data, cl *Client) {

M testdata/api_test.txt => testdata/api_test.txt +4 -5
@@ 99,7 99,8 @@ command = x=$(($RANDOM%10)) && echo "sleep in ${x}s" && sleep $x
{
  "code": 412,
  "message": "job is paused",
  "name": "ERR_JOB_PAUSED"
  "name": "ERR_JOB_PAUSED",
  "data": null
}

<<< apiJobResume.json


@@ 112,7 113,7 @@ command = x=$(($RANDOM%10)) && echo "sleep in ${x}s" && sleep $x
    "echo Counter is $KARAJO_JOB_COUNTER",
    "x=$(($RANDOM%10)) \u0026\u0026 echo sleep in ${x}s \u0026\u0026 sleep $x"
  ],
  "last_run": "2023-01-09T00:00:00Z",
  "last_run": "0001-01-01T00:00:00Z",
  "next_run": "0001-01-01T00:00:00Z",
  "id": "test_job_success",
  "name": "Test job success",


@@ 130,13 131,11 @@ command = x=$(($RANDOM%10)) && echo "sleep in ${x}s" && sleep $x
    "echo Counter is $KARAJO_JOB_COUNTER",
    "x=$(($RANDOM%10)) \u0026\u0026 echo sleep in ${x}s \u0026\u0026 sleep $x"
  ],
  "last_run": "2023-01-09T00:00:00Z",
  "last_run": "0001-01-01T00:00:00Z",
  "next_run": "0001-01-01T00:00:00Z",
  "id": "test_job_success",
  "name": "Test job success",
  "status": "started",
  "max_running": 1,
  "num_running": 1,
  "log_retention": 5
}


M testdata/job_handleHttp_test.txt => testdata/job_handleHttp_test.txt +6 -6
@@ 2,19 2,18 @@ Test running Job with handleHttp.

<<< handleHttp_response.json
{
  "code": 200,
  "code": 202,
  "message": "OK",
  "data": {
    "path": "/test-job-handle-http",
    "auth_kind": "hmac-sha256",
    "header_sign": "X-Karajo-Sign",
    "last_run": "0001-01-01T00:00:00Z",
    "next_run": "0001-01-01T00:00:00Z",
    "last_run": "2023-01-08T23:30:00Z",
    "next_run": "2023-01-09T00:30:00Z",
    "id": "test_job_handle_http",
    "name": "Test job handle HTTP",
    "status": "started",
    "interval": 3600000000000,
    "max_running": 1,
    "num_running": 1,
    "log_retention": 5
  }
}


@@ 34,10 33,11 @@ Test running Job with handleHttp.
  "auth_kind": "hmac-sha256",
  "header_sign": "X-Karajo-Sign",
  "last_run": "2023-01-09T00:00:00Z",
  "next_run": "0001-01-01T00:00:00Z",
  "next_run": "2023-01-09T01:00:00Z",
  "id": "test_job_handle_http",
  "name": "Test job handle HTTP",
  "status": "success",
  "interval": 3600000000000,
  "max_running": 1,
  "log_retention": 5
}