Skip to content

Commit 8e874ca

Browse files
committedMay 2, 2020
base version
0 parents  commit 8e874ca

File tree

7 files changed

+417
-0
lines changed

7 files changed

+417
-0
lines changed
 

‎Makefile

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export GO111MODULE=on
2+
3+
LINT_VERSION := v1.16.0
4+
HAS_LINT := $(shell command -v golangci-lint 2> /dev/null)
5+
6+
test-redis-queue:
7+
docker-compose up --build --abort-on-container-exit
8+
docker-compose down
9+
10+
lint:
11+
ifndef HAS_LINT
12+
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(GOPATH)/bin $(LINT_VERSION)
13+
endif
14+
golangci-lint run \
15+
--disable-all=true \
16+
--enable=errcheck \
17+
./...
18+
19+
format:
20+
go fmt ./...

‎README.md

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Redis delayed queue
2+
3+
Based on redis doc: [delayed tasks](https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-4-task-queues/6-4-2-delayed-tasks/), [distributed locks](https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-2-distributed-locking/)
4+
5+
## Start
6+
7+
For start we use redis cluster instance and go code adds/removes to/from [ZSET](https://redis.io/commands#sorted_set) and SETNX for [distributed locks](https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-2-distributed-locking/)
8+
9+
### Docker redis cluster instance
10+
11+
[Docker Hub](https://hub.docker.com/r/grokzen/redis-cluster)
12+
[Github](https://github.com/Grokzen/docker-redis-cluster)
13+
14+
### Go code
15+
16+
Libs
17+
18+
- [Go redis client](https://github.com/go-redis/redis)
19+
- [Go redis distributed lock](https://github.com/bsm/redislock)
20+
21+
## Implementation
22+
23+
Normally when we talk about times in Redis, we usually start talking about ZSETs. What if, for any item we wanted to execute in the future, we added it to a ZSET with its score being the time when we want it to execute? Then we have to check for items that should be executed now.
24+
25+
Enqueue:
26+
```
27+
func (rq *RedisQueue) Enqueue(uuid string, delay time.Duration) {
28+
_ = rq.client.ZAdd(rq.key, &redis.Z{Member: uuid, Score: float64(time.Now().Add(delay).Unix())})
29+
}
30+
```
31+
Dequeue:
32+
```
33+
func (rq *RedisQueue) Dequeue() ([]Message, error) {
34+
var ms []Message
35+
start := int64(0)
36+
for i := rq.batch; i >= 0; {
37+
vals, err := rq.client.ZRangeWithScores(rq.key, start, start).Result()
38+
if err != nil {
39+
return nil, errors.Wrap(err, "cannot get range from zset")
40+
}
41+
if len(vals) == 0 || vals[0].Score > float64(time.Now().Unix()) {
42+
break
43+
}
44+
45+
id := vals[0].Member.(string)
46+
lock := rq.acquireLock(id)
47+
if lock == nil {
48+
start++
49+
continue
50+
}
51+
ms = append(ms, Message{Message: id, OnProcessed: func() {
52+
_ = rq.client.ZRem(rq.key, id)
53+
if err := lock.Release(); err != nil {
54+
fmt.Printf("release lock erros = %+v\n", err)
55+
}
56+
}})
57+
start++
58+
i--
59+
60+
}
61+
62+
return ms, nil
63+
}
64+
65+
```
66+
67+
## Test
68+
To run test:
69+
```
70+
make test-redis-queue
71+
```

‎docker-compose.yml

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
version: "2.1"
2+
services:
3+
tests:
4+
image: golang:1.12
5+
working_dir: /go/src/github.com/sergii4/redis-delayed-queue
6+
volumes:
7+
- $PWD:/go/src/github.com/sergii4/redis-delayed-queue
8+
- go-modules:/go/pkg/mod # Put modules cache into a separate volume
9+
depends_on:
10+
- testredis
11+
command: ["/bin/sh", "-c", "GO111MODULE=on go test -v -timeout 30s"]
12+
13+
testredis:
14+
image: grokzen/redis-cluster:latest
15+
logging:
16+
driver: "none"
17+
18+
volumes:
19+
go-modules: # Define the volume

‎go.mod

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module github.com/sergii4/redis-delayed-queue
2+
3+
go 1.12
4+
5+
require (
6+
github.com/bsm/redislock v0.5.0
7+
github.com/go-redis/redis/v7 v7.2.0
8+
github.com/pkg/errors v0.9.1
9+
github.com/satori/go.uuid v1.2.0
10+
github.com/stretchr/testify v1.5.1
11+
)

‎go.sum

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
github.com/bsm/redislock v0.5.0 h1:ODM11/cbuUXQqLgZWK6XQnufaTjsBE2UcwBc2EAFNDA=
2+
github.com/bsm/redislock v0.5.0/go.mod h1:qagqKlV+xiLy26iV34Y3zRPxRcJjQYbV7pZfWFeSZ8M=
3+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
4+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
6+
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
7+
github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs=
8+
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
9+
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
10+
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
11+
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
12+
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
13+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
14+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
15+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
16+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
17+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
18+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
19+
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
20+
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
21+
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
22+
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
23+
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
24+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
25+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
26+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
27+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
28+
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
29+
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
30+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
31+
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
32+
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
33+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
34+
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
35+
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
36+
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
37+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
38+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
39+
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
40+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
41+
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
42+
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
43+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
44+
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
45+
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
46+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
47+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
48+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
49+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
50+
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
51+
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
52+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
53+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
54+
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
55+
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
56+
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
57+
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

‎queue.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package redisqueue
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/bsm/redislock"
8+
"github.com/go-redis/redis/v7"
9+
"github.com/pkg/errors"
10+
)
11+
12+
type Message struct {
13+
Message string
14+
OnProcessed func()
15+
}
16+
17+
type Queue interface {
18+
Enqueue(message string, after time.Duration)
19+
Dequeue() ([]Message, error)
20+
}
21+
22+
// RedisQueue implements Redis Delayed tasks algorith: https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-4-task-queues/6-4-2-delayed-tasks/
23+
type RedisQueue struct {
24+
client *redis.ClusterClient
25+
locker *redislock.Client
26+
key string
27+
batch int
28+
ttl time.Duration
29+
}
30+
31+
func NewQueue(client *redis.ClusterClient, locker *redislock.Client, key string, batch int, ttl time.Duration) Queue {
32+
return &RedisQueue{client: client, locker: locker, key: key, batch: batch, ttl: ttl}
33+
}
34+
35+
// Enqueue puts task/item with uuid to the queue/container with delay
36+
func (rq *RedisQueue) Enqueue(uuid string, delay time.Duration) {
37+
_ = rq.client.ZAdd(rq.key, &redis.Z{Member: uuid, Score: float64(time.Now().Add(delay).Unix())})
38+
}
39+
40+
// Dequeue receives batch of messages from the queue
41+
func (rq *RedisQueue) Dequeue() ([]Message, error) {
42+
var ms []Message
43+
start := int64(0)
44+
for i := rq.batch; i >= 0; {
45+
vals, err := rq.client.ZRangeWithScores(rq.key, start, start).Result()
46+
if err != nil {
47+
return nil, errors.Wrap(err, "cannot get range from zset")
48+
}
49+
if len(vals) == 0 || vals[0].Score > float64(time.Now().Unix()) {
50+
break
51+
}
52+
53+
id := vals[0].Member.(string)
54+
lock := rq.acquireLock(id)
55+
if lock == nil {
56+
start++
57+
continue
58+
}
59+
ms = append(ms, Message{Message: id, OnProcessed: func() {
60+
_ = rq.client.ZRem(rq.key, id)
61+
if err := lock.Release(); err != nil {
62+
fmt.Printf("release lock erros = %+v\n", err)
63+
}
64+
}})
65+
start++
66+
i--
67+
68+
}
69+
70+
return ms, nil
71+
72+
}
73+
74+
// acquireLock gets the lock for the item with key
75+
func (rq *RedisQueue) acquireLock(key string) *redislock.Lock {
76+
lock, err := rq.locker.Obtain(key, 1000*time.Millisecond, nil)
77+
if err != nil {
78+
return nil
79+
}
80+
return lock
81+
}

‎queue_test.go

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package redisqueue
2+
3+
import (
4+
"strings"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/bsm/redislock"
10+
"github.com/go-redis/redis/v7"
11+
uuid "github.com/satori/go.uuid"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestQueueLock(t *testing.T) {
16+
t.Run("same number enqueued/dequeued, 0 delayed task left in queue", func(t *testing.T) {
17+
batch := 20
18+
numItems := 100
19+
ttl := 100 * time.Millisecond
20+
q := setUpQueue(t, uuid.NewV4().String(), batch, ttl)
21+
// arange
22+
for i := 0; i < numItems; i++ {
23+
q.Enqueue("item"+uuid.NewV4().String(), 0)
24+
}
25+
ch := make(chan Message, numItems)
26+
var wg sync.WaitGroup
27+
numOfWorkers := 8
28+
wg.Add(numOfWorkers)
29+
fail := make(chan error)
30+
// act
31+
// run concurrently the same number goroutines as items
32+
go func() {
33+
for i := 0; i < numOfWorkers; i++ {
34+
// allows goroutins deque enough messages
35+
time.Sleep(5 * ttl)
36+
go func() {
37+
defer wg.Done()
38+
ms, err := q.Dequeue()
39+
if err != nil {
40+
fail <- err
41+
return
42+
}
43+
for _, m := range ms {
44+
ch <- m
45+
}
46+
}()
47+
}
48+
}()
49+
go func() {
50+
// close items when every goroutine is executed
51+
wg.Wait()
52+
close(ch)
53+
}()
54+
55+
count := 0
56+
done := make(chan struct{})
57+
// read from message channel
58+
go func() {
59+
for m := range ch {
60+
m.OnProcessed()
61+
count++
62+
}
63+
close(done)
64+
}()
65+
// wait for done, fail or timeout
66+
select {
67+
case <-done:
68+
case err := <-fail:
69+
t.Fatal(err)
70+
case <-time.After(10 * time.Second):
71+
t.Fatal("timeout")
72+
}
73+
// assert
74+
require.Equal(t, numItems, count)
75+
})
76+
t.Run("first routine processed all messeges last none", func(t *testing.T) {
77+
numItems := 5
78+
ttl := 100 * time.Millisecond
79+
q := setUpQueue(t, uuid.NewV4().String(), numItems, ttl)
80+
// arange
81+
for i := 0; i < numItems; i++ {
82+
q.Enqueue("item"+uuid.NewV4().String(), 0)
83+
}
84+
for i := 0; i < numItems; i++ {
85+
q.Enqueue("item"+uuid.NewV4().String(), 5*time.Second)
86+
}
87+
ms, err := q.Dequeue()
88+
if err != nil {
89+
require.NoError(t, err)
90+
}
91+
require.Len(t, ms, numItems)
92+
})
93+
94+
}
95+
96+
func TestQueueuBecameEmpty(t *testing.T) {
97+
numItems := 10
98+
ttl := 100 * time.Millisecond
99+
q := setUpQueue(t, uuid.NewV4().String(), numItems, ttl)
100+
// arange
101+
for i := 0; i < numItems; i++ {
102+
q.Enqueue("item"+uuid.NewV4().String(), 0)
103+
}
104+
105+
ms, err := q.Dequeue()
106+
require.NoError(t, err)
107+
require.Equal(t, numItems, len(ms))
108+
for _, msg := range ms {
109+
msg.OnProcessed()
110+
}
111+
112+
ms, err = q.Dequeue()
113+
require.NoError(t, err)
114+
require.Equal(t, 0, len(ms))
115+
}
116+
117+
func setUpQueue(t *testing.T, key string, batch int, ttl time.Duration) Queue {
118+
client := setRedisClusterClient(t)
119+
locker := redislock.New(client)
120+
121+
return NewQueue(client, locker, key, batch, ttl)
122+
}
123+
124+
func setRedisClusterClient(t *testing.T) *redis.ClusterClient {
125+
client := redis.NewClusterClient(&redis.ClusterOptions{
126+
Addrs: []string{"testredis:7000", "testredis:7001"},
127+
})
128+
129+
waitOKState(t, client)
130+
131+
return client
132+
}
133+
134+
func waitOKState(t *testing.T, client *redis.ClusterClient) {
135+
done := make(chan struct{})
136+
fail := make(chan error)
137+
go func() {
138+
for {
139+
info, err := client.ClusterInfo().Result()
140+
if err != nil {
141+
fail <- err
142+
return
143+
}
144+
if strings.HasPrefix(info, "cluster_state:ok") {
145+
// fmt.Println(info)
146+
close(done)
147+
return
148+
}
149+
}
150+
}()
151+
select {
152+
case err := <-fail:
153+
t.Fatalf("cannot connect to redis instance: %s", err)
154+
case <-time.After(20 * time.Second):
155+
t.Fatal("cluster state: fail")
156+
case <-done:
157+
}
158+
}

0 commit comments

Comments
 (0)
Please sign in to comment.