Skip to content

Initial stream support. #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
pytest:
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
py_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v4
Expand Down
89 changes: 57 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,48 @@ Launch the workers:
Then run the main code:
`python3 broker.py`

## PubSubBroker and ListQueueBroker configuration

We have two brokers with similar interfaces, but with different logic.
The PubSubBroker uses redis' pubsub mechanism and is very powerful,
but it executes every task on all workers, because PUBSUB broadcasts message
to all subscribers.

If you want your messages to be processed only once, please use ListQueueBroker.
It uses redis' [LPUSH](https://redis.io/commands/lpush/) and [BRPOP](https://redis.io/commands/brpop/) commands to deal with messages.

Brokers parameters:
* `url` - url to redis.
* `task_id_generator` - custom task_id genertaor.
* `result_backend` - custom result backend.
* `queue_name` - name of the pub/sub channel in redis.
* `max_connection_pool_size` - maximum number of connections in pool.
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
(or set it to `None` to try reconnects indefinitely).

## Brokers

This package contains 6 broker implementations.
3 broker types:
* PubSub broker
* ListQueue broker
* Stream broker

Each of type is implemented for each redis architecture:
* Single node
* Cluster
* Sentinel

Here's a small breakdown of how they differ from eachother.


### PubSub

By default on old redis versions PUBSUB was the way of making redis into a queue.
But using PUBSUB means that all messages delivered to all subscribed consumers.

> [!WARNING]
> This broker doesn't support acknowledgements. If during message processing
> Worker was suddenly killed the message is going to be lost.

### ListQueue

This broker creates a list of messages at some key. Adding new tasks will be done
by appending them from the left side using `lpush`, and taking them from the right side using `brpop`.

> [!WARNING]
> This broker doesn't support acknowledgements. If during message processing
> Worker was suddenly killed the message is going to be lost.

### Stream

Stream brokers use redis [stream type](https://redis.io/docs/latest/develop/data-types/streams/) to store and fetch messages.

> [!TIP]
> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is
> required.

## RedisAsyncResultBackend configuration

Expand All @@ -85,18 +108,20 @@ RedisAsyncResultBackend parameters:
* Any other keyword arguments are passed to `redis.asyncio.BlockingConnectionPool`.
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
(or set it to `None` to try reconnects indefinitely).
> IMPORTANT: **It is highly recommended to use expire time ​​in RedisAsyncResultBackend**

> [!WARNING]
> **It is highly recommended to use expire time in RedisAsyncResultBackend**
> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set.
>```python
># First variant
>redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_ex_time=1000,
>)
> ```python
> # First variant
> redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_ex_time=1000,
> )
>
># Second variant
>redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_px_time=1000000,
>)
>```
> # Second variant
> redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_px_time=1000000,
> )
> ```
12 changes: 5 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
version: '3.2'

services:
redis:
image: bitnami/redis:6.2.5
image: bitnami/redis:7.4.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
Expand All @@ -14,7 +12,7 @@ services:
ports:
- 7000:6379
redis-node-0: &redis-node
image: docker.io/bitnami/redis-cluster:7.2
image: docker.io/bitnami/redis-cluster:7.4.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
Expand All @@ -38,7 +36,7 @@ services:
<<: *redis-node

redis-node-5:
image: docker.io/bitnami/redis-cluster:7.2
image: docker.io/bitnami/redis-cluster:7.4.2
depends_on:
- redis-node-0
- redis-node-1
Expand All @@ -60,7 +58,7 @@ services:
- 7001:6379

redis-master:
image: bitnami/redis:6.2.5
image: bitnami/redis:7.4.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
Expand All @@ -71,7 +69,7 @@ services:
start_period: 10s

redis-sentinel:
image: bitnami/redis-sentinel:latest
image: bitnami/redis-sentinel:7.4.2
depends_on:
- redis-master
environment:
Expand Down
Loading