~scooter/queue

0cb837b0e697575702b4540f229f2ed7ff461c00 — Bryan Burke 3 years ago 82fa371
auto requeue expired on database action
2 files changed, 46 insertions(+), 0 deletions(-)

M queue.sql
M queue_test.go
M queue.sql => queue.sql +5 -0
@@ 11,3 11,8 @@ CREATE TABLE IF NOT EXISTS queue (
);

CREATE INDEX IF NOT EXISTS expires_idx ON queue(expires); 

CREATE TRIGGER IF NOT EXISTS requeue_expired BEFORE UPDATE ON queue BEGIN
  UPDATE queue SET expires = 0, tries = tries + 1 WHERE expires > 0 AND expires < (SELECT strftime('%s', 'now'));
  UPDATE queue SET expires = -1 WHERE max_tries > 0 AND tries = max_tries;
END

M queue_test.go => queue_test.go +41 -0
@@ 99,6 99,47 @@ func TestRequeue(t *testing.T) {
	assert.Equal(t, 1, nAfterRQ)
}

func TestAutoRequeue(t *testing.T) {
	db, err := newDB()
	require.NoError(t, err)

	assert.NoError(t, MigrateDB(db))

	// enqueue, check length, dequeue, set expires to past, requeue
	if _, err := Enqueue(db, "test", makePayload(11)); err != nil {
		require.NoError(t, err)
	}
	if _, err := Enqueue(db, "test2", makePayload(12)); err != nil {
		require.NoError(t, err)
	}

	// should have one in queue for test
	nBefore, err := Length(db, "test")
	require.NoError(t, err)
	assert.Equal(t, 1, nBefore)

	// dequeue and have TTL be 0
	if _, _, err := Dequeue(db, "test", Timeout(0)); err != nil {
		assert.NoError(t, err)
	}
	// must sleep at least one second due to unix resolution
	time.Sleep(1 * time.Second)

	nAfter, err := Length(db, "test")
	require.NoError(t, err)
	assert.Equal(t, 0, nAfter)

	// dequeue other task to trigger requeue
	if _, _, err := Dequeue(db, "test2"); err != nil {
		assert.NoError(t, err)
	}

	// should auto requeue
	nAfterRQ, err := Length(db, "test")
	require.NoError(t, err)
	assert.Equal(t, 1, nAfterRQ)
}

func TestOptions(t *testing.T) {
	db, err := newDB()
	require.NoError(t, err)