Skip to content

Commit 4f482af

Browse files
author
Robin Joseph
authored
feat(base): create first version of producer and consumer (robinjoseph08#2)
1 parent 7f50a6b commit 4f482af

24 files changed

+1735
-1
lines changed

.chglog/CHANGELOG.tpl.md

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{{ range .Versions }}
2+
<a name="{{ .Tag.Name }}"></a>
3+
## {{ if .Tag.Previous }}[{{ .Tag.Name }}]({{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }}){{ else }}{{ .Tag.Name }}{{ end }} ({{ datetime "2006-01-02" .Tag.Date }})
4+
5+
{{ range .CommitGroups -}}
6+
### {{ .Title }}
7+
8+
{{ range .Commits -}}
9+
* {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
10+
{{ end }}
11+
{{ end -}}
12+
13+
{{- if .RevertCommits -}}
14+
### Reverts
15+
16+
{{ range .RevertCommits -}}
17+
* {{ .Revert.Header }}
18+
{{ end }}
19+
{{ end -}}
20+
21+
{{- if .NoteGroups -}}
22+
{{ range .NoteGroups -}}
23+
### {{ .Title }}
24+
25+
{{ range .Notes }}
26+
{{ .Body }}
27+
{{ end }}
28+
{{ end -}}
29+
{{ end -}}
30+
{{ end -}}

.chglog/config.yml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
style: github
2+
template: CHANGELOG.tpl.md
3+
info:
4+
title: CHANGELOG
5+
repository_url: https://github.com/robinjoseph08/go-pg-migrations
6+
options:
7+
commit_groups:
8+
title_maps:
9+
docs: Documentation
10+
feat: Features
11+
fix: Bug Fixes
12+
perf: Performance Improvements
13+
refactor: Code Refactoring
14+
header:
15+
pattern: "^(\\w*)(?:\\(([\\w\\$\\.\\-\\*\\s]*)\\))?\\:\\s(.*)$"
16+
pattern_maps:
17+
- Type
18+
- Scope
19+
- Subject
20+
notes:
21+
keywords:
22+
- BREAKING CHANGE

.editorconfig

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[*]
2+
charset = utf-8
3+
end_of_line = lf
4+
insert_final_newline = true
5+
trim_trailing_whitespace = true
6+
7+
[Makefile]
8+
indent_style = tab
9+
10+
[*.go]
11+
indent_style = tab

.gitignore

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Created by https://www.gitignore.io/api/go,vim,macos
2+
3+
### Go ###
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.dll
7+
*.so
8+
*.dylib
9+
10+
# Test binary, build with `go test -c`
11+
*.test
12+
13+
# Output of the go coverage tool, specifically when used with LiteIDE
14+
coverage.out
15+
coverage.html
16+
17+
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
18+
.glide/
19+
20+
# Vendored dependencies
21+
vendor/*
22+
_vendor*/*
23+
24+
# Binary executables
25+
bin/
26+
27+
### macOS ###
28+
*.DS_Store
29+
.AppleDouble
30+
.LSOverride
31+
32+
# Icon must end with two \r
33+
Icon
34+
35+
# Thumbnails
36+
._*
37+
38+
# Files that might appear in the root of a volume
39+
.DocumentRevisions-V100
40+
.fseventsd
41+
.Spotlight-V100
42+
.TemporaryItems
43+
.Trashes
44+
.VolumeIcon.icns
45+
.com.apple.timemachine.donotpresent
46+
47+
# Directories potentially created on remote AFP share
48+
.AppleDB
49+
.AppleDesktop
50+
Network Trash Folder
51+
Temporary Items
52+
.apdisk
53+
54+
### Vim ###
55+
# swap
56+
.sw[a-p]
57+
.*.sw[a-p]
58+
# session
59+
Session.vim
60+
# temporary
61+
.netrwhist
62+
*~
63+
# auto-generated tag files
64+
tags
65+
66+
67+
# End of https://www.gitignore.io/api/go,vim,macos

.go-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1.12.7

.golangci.yml

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
output:
2+
format: tab
3+
linters:
4+
disable-all: true
5+
enable:
6+
- deadcode
7+
- depguard
8+
- dupl
9+
- goconst
10+
- gocritic
11+
- gocyclo
12+
- gofmt
13+
- goimports
14+
- golint
15+
- gosec
16+
- govet
17+
- ineffassign
18+
- maligned
19+
- misspell
20+
- prealloc
21+
- scopelint
22+
- structcheck
23+
- typecheck
24+
- unconvert
25+
- varcheck
26+
issues:
27+
exclude-use-default: false
28+
max-per-linter: 0
29+
max-same-issues: 0
30+
exclude-rules:
31+
- path: _test\.go
32+
linters:
33+
- dupl

.travis.yml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
dist: trusty
2+
sudo: false
3+
language: go
4+
addons:
5+
apt:
6+
packages:
7+
- redis-server
8+
go: "1.12.7"
9+
env:
10+
- GO111MODULE=on
11+
install:
12+
- make setup
13+
- make install
14+
script:
15+
- make lint
16+
- make test
17+
- make enforce
18+
- make coveralls

LICENSE.txt

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2019 Robin Joseph
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

Makefile

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
BIN_DIR ?= ./bin
2+
DIRS ?= $(shell find . -name '*.go' | grep --invert-match 'vendor' | xargs -n 1 dirname | sort --unique)
3+
GO_TOOLS := \
4+
github.com/git-chglog/git-chglog/cmd/git-chglog \
5+
github.com/mattn/goveralls \
6+
7+
TFLAGS ?=
8+
9+
COVERAGE_PROFILE ?= coverage.out
10+
HTML_OUTPUT ?= coverage.html
11+
12+
PSQL := $(shell command -v psql 2> /dev/null)
13+
14+
TEST_DATABASE_USER ?= go_pg_migrations_user
15+
TEST_DATABASE_NAME ?= go_pg_migrations
16+
17+
default: install
18+
19+
.PHONY: clean
20+
clean:
21+
@echo "---> Cleaning"
22+
go clean
23+
24+
coveralls:
25+
@echo "---> Sending coverage info to Coveralls"
26+
goveralls -coverprofile=$(COVERAGE_PROFILE) -service=travis-ci
27+
28+
.PHONY: enforce
29+
enforce:
30+
@echo "---> Enforcing coverage"
31+
./scripts/coverage.sh $(COVERAGE_PROFILE)
32+
33+
.PHONY: html
34+
html:
35+
@echo "---> Generating HTML coverage report"
36+
go tool cover -html $(COVERAGE_PROFILE) -o $(HTML_OUTPUT)
37+
open $(HTML_OUTPUT)
38+
39+
.PHONY: install
40+
install:
41+
@echo "---> Installing dependencies"
42+
go mod download
43+
44+
.PHONY: lint
45+
lint:
46+
@echo "---> Linting"
47+
$(BIN_DIR)/golangci-lint run
48+
49+
.PHONY: release
50+
release:
51+
@echo "---> Creating new release"
52+
ifndef tag
53+
$(error tag must be specified)
54+
endif
55+
git-chglog --output CHANGELOG.md --next-tag $(tag)
56+
git add CHANGELOG.md
57+
git commit -m $(tag)
58+
git tag $(tag)
59+
git push origin master --tags
60+
61+
.PHONY: setup
62+
setup: install
63+
@echo "--> Setting up"
64+
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(BIN_DIR) v1.16.0
65+
go get $(GO_TOOLS) && GOBIN=$$(realpath $(BIN_DIR)) go install $(GO_TOOLS)
66+
67+
.PHONY: test
68+
test:
69+
@echo "---> Testing"
70+
go test ./... -coverprofile $(COVERAGE_PROFILE) $(TFLAGS)

README.md

+113-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,114 @@
11
# redisqueue
2-
A Golang queue worker using Redis streams
2+
3+
[![GoDoc](https://godoc.org/github.com/robinjoseph08/redisqueue?status.svg)](https://godoc.org/github.com/robinjoseph08/redisqueue)
4+
[![Build Status](https://travis-ci.org/robinjoseph08/redisqueue.svg?branch=master)](https://travis-ci.org/robinjoseph08/redisqueue)
5+
[![Coverage Status](https://coveralls.io/repos/github/robinjoseph08/redisqueue/badge.svg?branch=master)](https://coveralls.io/github/robinjoseph08/redisqueue?branch=master)
6+
[![Go Report Card](https://goreportcard.com/badge/github.com/robinjoseph08/redisqueue)](https://goreportcard.com/report/github.com/robinjoseph08/redisqueue)
7+
![License](https://img.shields.io/github/license/robinjoseph08/redisqueue.svg)
8+
9+
`redisqueue` provides a producer and consumer of a queue that uses [Redis
10+
streams](https://redis.io/topics/streams-intro).
11+
12+
## Features
13+
14+
- A `Producer` struct to make enqueuing messages easy.
15+
- A `Consumer` struct to make processing messages concurrenly.
16+
- Claiming and acknowledging messages if there's no error, so that if a consumer
17+
dies while processing, the message it was working on isn't lost. This
18+
guarantees at least once delivery.
19+
- A "visibility timeout" so that if a message isn't processed in a designated
20+
time frame, it will be be processed by another consumer.
21+
- A max length on the stream so that it doesn't store the messages indefinitely
22+
and run out of memory.
23+
- Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight
24+
messages complete.
25+
- A channel that will surface any errors so you can handle them centrally.
26+
- Graceful handling of panics to avoid crashing the whole process.
27+
- A concurrency setting to control how many goroutines are spawned to process
28+
messages.
29+
- A batch size setting to limit the total messages in flight.
30+
- Support for multiple streams.
31+
32+
## Example
33+
34+
Here's an example of a producer that inserts 1000 messages into a queue:
35+
36+
```go
37+
package main
38+
39+
import (
40+
"fmt"
41+
42+
"github.com/robinjoseph08/redisqueue"
43+
)
44+
45+
func main() {
46+
p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
47+
StreamMaxLength: 10000,
48+
ApproximateMaxLength: true,
49+
})
50+
if err != nil {
51+
panic(err)
52+
}
53+
54+
for i := 0; i < 1000; i++ {
55+
err := p.Enqueue(&redisqueue.Message{
56+
Stream: "redisqueue:test",
57+
Values: map[string]interface{}{
58+
"index": i,
59+
},
60+
})
61+
if err != nil {
62+
panic(err)
63+
}
64+
65+
if i%100 == 0 {
66+
fmt.Printf("enqueued %d\n", i)
67+
}
68+
}
69+
}
70+
```
71+
72+
And here's an example of a consumer that reads the messages off of that queue:
73+
74+
```go
75+
package main
76+
77+
import (
78+
"fmt"
79+
"time"
80+
81+
"github.com/robinjoseph08/redisqueue"
82+
)
83+
84+
func main() {
85+
c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
86+
VisibilityTimeout: 60 * time.Second,
87+
BlockingTimeout: 5 * time.Second,
88+
ReclaimInterval: 1 * time.Second,
89+
BufferSize: 100,
90+
Concurrency: 10,
91+
})
92+
if err != nil {
93+
panic(err)
94+
}
95+
96+
c.Register("redisqueue:test", process)
97+
98+
go func() {
99+
for err := range c.Errors {
100+
// handle errors accordingly
101+
fmt.Printf("err: %+v\n", err)
102+
}
103+
}()
104+
105+
fmt.Println("starting")
106+
c.Run()
107+
fmt.Println("stopped")
108+
}
109+
110+
func process(msg *redisqueue.Message) error {
111+
fmt.Printf("processing message: %v\n", msg.Values["index"])
112+
return nil
113+
}
114+
```

0 commit comments

Comments
 (0)