-
Notifications
You must be signed in to change notification settings - Fork 347
/
Copy pathrequeuer_test.go
81 lines (62 loc) · 2.2 KB
/
requeuer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package work
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRequeue(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
tMock := nowEpochSeconds() - 10
setNowEpochSecondsMock(tMock)
defer resetNowEpochSecondsMock()
enqueuer := NewEnqueuer(ns, pool)
_, err := enqueuer.EnqueueIn("wat", -9, nil)
assert.NoError(t, err)
_, err = enqueuer.EnqueueIn("wat", -9, nil)
assert.NoError(t, err)
_, err = enqueuer.EnqueueIn("foo", 10, nil)
assert.NoError(t, err)
_, err = enqueuer.EnqueueIn("foo", 14, nil)
assert.NoError(t, err)
_, err = enqueuer.EnqueueIn("bar", 19, nil)
assert.NoError(t, err)
resetNowEpochSecondsMock()
re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"wat", "foo", "bar"})
re.start()
re.drain()
re.stop()
assert.EqualValues(t, 2, listSize(pool, redisKeyJobs(ns, "wat")))
assert.EqualValues(t, 1, listSize(pool, redisKeyJobs(ns, "foo")))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, "bar")))
assert.EqualValues(t, 2, zsetSize(pool, redisKeyScheduled(ns)))
j := jobOnQueue(pool, redisKeyJobs(ns, "foo"))
assert.Equal(t, j.Name, "foo")
// Because we mocked time to 10 seconds ago above, the job was put on the zset with t=10 secs ago
// We want to ensure it's requeued with t=now.
// On boundary conditions with the VM, nowEpochSeconds() might be 1 or 2 secs ahead of EnqueuedAt
assert.True(t, (j.EnqueuedAt+2) >= nowEpochSeconds())
}
func TestRequeueUnknown(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
tMock := nowEpochSeconds() - 10
setNowEpochSecondsMock(tMock)
defer resetNowEpochSecondsMock()
enqueuer := NewEnqueuer(ns, pool)
_, err := enqueuer.EnqueueIn("wat", -9, nil)
assert.NoError(t, err)
nowish := nowEpochSeconds()
setNowEpochSecondsMock(nowish)
re := newRequeuer(ns, pool, redisKeyScheduled(ns), []string{"bar"})
re.start()
re.drain()
re.stop()
assert.EqualValues(t, 0, zsetSize(pool, redisKeyScheduled(ns)))
assert.EqualValues(t, 1, zsetSize(pool, redisKeyDead(ns)))
rank, job := jobOnZset(pool, redisKeyDead(ns))
assert.Equal(t, nowish, rank)
assert.Equal(t, nowish, job.FailedAt)
assert.Equal(t, "unknown job when requeueing", job.LastErr)
}