Skip to content

Commit

Permalink
Intro decorator way to start consumer (#37)
Browse files Browse the repository at this point in the history
* pin pydantic and redis version

* Imp decorator and change README

* Allow spec config with code

* Not cov daemon
  • Loading branch information
Wh1isper authored Sep 4, 2024
1 parent 966c85f commit 0df145c
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 15 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ omit =
brq/tools.py
brq/cli.py
brq/envs.py
brq/daemon.py
50 changes: 38 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi
- Multiple consumers in one consumer group
- No scheduler needed, consumer handles itself

## Configuration

If using `BrqConfig`, you can use a `.env` file and environment variables to configure brq. The prefix of environment variables is `BRQ_`.

> For example, `BRQ_REDIS_PORT=6379 python consumer.py` for specifying redis port.
See [configs](./brq/configs.py) for more details.

## Echo job overview

### Producer
Expand All @@ -40,21 +48,18 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi
import os

from brq.producer import Producer
from brq.tools import get_redis_client, get_redis_url
from brq.configs import BrqConfig


async def main():
redis_url = get_redis_url(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
db=int(os.getenv("REDIS_DB", 0)),
cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
username=os.getenv("REDIS_USERNAME", ""),
password=os.getenv("REDIS_PASSWORD", ""),
)
async with get_redis_client(redis_url) as async_redis_client:
await Producer(async_redis_client).run_job("echo", ["hello"])
config = BrqConfig()
async with config.open_redis_client() as async_redis_client:
await Producer(
async_redis_client,
redis_prefix=config.redis_key_prefix,
redis_seperator=config.redis_key_seperator,
max_message_len=config.producer_max_message_length,
).run_job("echo", ["hello"])


if __name__ == "__main__":
Expand All @@ -65,6 +70,27 @@ if __name__ == "__main__":

### Consumer

The only thing you need is `@task`, and the target function can be `sync` or `async` and `sync` function will be converted to `async` function and run in a thread automatically.

```python
from brq import task


@task
def echo(message):
print(f"Received message: {message}")


if __name__ == "__main__":
# Run the task once, for local debug
# echo("hello")

# Run as a daemon
echo.serve()
```

This is the same as the following, the classic way...But more flexible.

```python
import os

Expand Down
16 changes: 16 additions & 0 deletions brq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,19 @@
__author__ = "wh1isper"
__email__ = "[email protected]"
__version__ = "0.3.9.dev0"


from .configs import BrqConfig
from .consumer import Consumer
from .decorator import task
from .producer import Producer
from .tools import get_redis_client, get_redis_url

__all__ = [
"task",
"Consumer",
"Producer",
"get_redis_client",
"get_redis_url",
"BrqConfig",
]
71 changes: 71 additions & 0 deletions brq/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import uuid
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator

import redis.asyncio as redis
from pydantic_settings import BaseSettings, SettingsConfigDict

from brq.tools import get_redis_client, get_redis_url


class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="brq_",
env_file=".env",
env_nested_delimiter="__",
env_file_encoding="utf-8",
case_sensitive=False,
)


class RedisSettingsMixin:
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_cluster: bool = False
redis_tls: bool = False
redis_username: str = ""
redis_password: str = ""

@property
def redis_url(self) -> str:
return get_redis_url(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
cluster=self.redis_cluster,
tls=self.redis_tls,
username=self.redis_username,
password=self.redis_password,
)

@asynccontextmanager
async def open_redis_client(
self,
) -> AsyncGenerator[Any, redis.Redis | redis.RedisCluster]:
async with get_redis_client(self.redis_url) as redis_client:
yield redis_client


class BrqConfig(Settings, RedisSettingsMixin):
redis_key_prefix: str = "brq"
redis_key_seperator: str = ":"

producer_max_message_length: int = 1000

consumer_group_name: str = "default-workers"
consumer_identifier: str = uuid.uuid4().hex
consumer_count_per_fetch: int = 1
consumer_block_time: int = 1
consumer_expire_time: int = 60 * 60
consumer_process_timeout: int = 60
consumer_retry_lock_time: int = 300
consumer_retry_cooldown_time: int = 60
consumer_enable_enque_deferred_job: bool = True
consumer_enable_reprocess_timeout_job: bool = True
consumer_enable_dead_queue: bool = True
consumer_max_message_len: int = 1000
consumer_delete_message_after_process: bool = False
consumer_run_parallel: bool = False

daemon_concurrency: int = 1
158 changes: 158 additions & 0 deletions brq/decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import asyncio
from functools import partial
from typing import Awaitable, Callable

from anyio import CapacityLimiter

from brq.configs import BrqConfig
from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.log import logger
from brq.tools import ensure_awaitable


class BrqTaskWrapper:
def __init__(
self,
func: Callable | Awaitable,
config: BrqConfig,
register_function_name: str | None = None,
):
self._func = func
self.config = config
self.register_function_name = register_function_name or func.__name__

def __call__(self, *args, **kwargs):
return self._func(*args, **kwargs)

def serve(self):
asyncio.run(self._serve())

async def _serve(self):
async with self.config.open_redis_client() as async_redis_client:
awaitable_function = ensure_awaitable(
self._func,
limiter=CapacityLimiter(total_tokens=self.config.daemon_concurrency),
)
consumer_builder = partial(
Consumer,
redis_prefix=self.config.redis_key_prefix,
redis_seperator=self.config.redis_key_seperator,
redis=async_redis_client,
awaitable_function=awaitable_function,
register_function_name=self.register_function_name,
group_name=self.config.consumer_group_name,
consumer_identifier=self.config.consumer_identifier,
count_per_fetch=self.config.consumer_count_per_fetch,
block_time=self.config.consumer_block_time,
expire_time=self.config.consumer_expire_time,
process_timeout=self.config.consumer_process_timeout,
retry_lock_time=self.config.consumer_retry_lock_time,
retry_cooldown_time=self.config.consumer_retry_cooldown_time,
enable_enque_deferred_job=self.config.consumer_enable_enque_deferred_job,
enable_reprocess_timeout_job=self.config.consumer_enable_reprocess_timeout_job,
enable_dead_queue=self.config.consumer_enable_dead_queue,
max_message_len=self.config.consumer_max_message_len,
delete_message_after_process=self.config.consumer_delete_message_after_process,
run_parallel=self.config.consumer_run_parallel,
)
daemon = Daemon(*[consumer_builder() for _ in range(self.config.daemon_concurrency)])
await daemon.run_forever()


def task(
_func=None,
*,
config: BrqConfig | None = None,
register_function_name: str | None = None,
# Redis connection
redis_host: str | None = None,
redis_port: int | None = None,
redis_db: int | None = None,
redis_cluster: bool | None = None,
redis_tls: bool | None = None,
redis_username: str | None = None,
redis_password: str | None = None,
# Scope
redis_key_prefix: str | None = None,
redis_key_seperator: str | None = None,
# Consumer
group_name: str | None = None,
consumer_identifier: str | None = None,
count_per_fetch: int | None = None,
block_time: int | None = None,
expire_time: int | None = None,
process_timeout: int | None = None,
retry_lock_time: int | None = None,
retry_cooldown_time: int | None = None,
enable_enque_deferred_job: bool | None = None,
enable_reprocess_timeout_job: bool | None = None,
enable_dead_queue: bool | None = None,
max_message_len: int | None = None,
delete_message_after_process: bool | None = None,
run_parallel: bool | None = None,
# Daemon
daemon_concurrency: int | None = None,
):

def _wrapper(
func,
config: BrqConfig | None = None,
register_function_name: str | None = None,
):

return BrqTaskWrapper(func, config, register_function_name)

kwargs = {
k: v
for k, v in {
"redis_host": redis_host,
"redis_port": redis_port,
"redis_db": redis_db,
"redis_cluster": redis_cluster,
"redis_tls": redis_tls,
"redis_username": redis_username,
"redis_password": redis_password,
"redis_key_prefix": redis_key_prefix,
"redis_key_seperator": redis_key_seperator,
"consumer_group_name": group_name,
"consumer_identifier": consumer_identifier,
"consumer_count_per_fetch": count_per_fetch,
"consumer_block_time": block_time,
"consumer_expire_time": expire_time,
"consumer_process_timeout": process_timeout,
"consumer_retry_lock_time": retry_lock_time,
"consumer_retry_cooldown_time": retry_cooldown_time,
"consumer_enable_enque_deferred_job": enable_enque_deferred_job,
"consumer_enable_reprocess_timeout_job": enable_reprocess_timeout_job,
"consumer_enable_dead_queue": enable_dead_queue,
"consumer_max_message_len": max_message_len,
"consumer_delete_message_after_process": delete_message_after_process,
"consumer_run_parallel": run_parallel,
"daemon_concurrency": daemon_concurrency,
}.items()
if v is not None
}

if not config:
logger.info("Initializing config from environment variables and .env file")
config = BrqConfig()
else:
logger.info("Using custom config")
config = config

config = BrqConfig.model_validate(
{
**config.model_dump(),
**kwargs,
}
)

if _func is None:
return partial(
_wrapper,
config=config,
register_function_name=register_function_name,
)
else:
return _wrapper(_func, config=config, register_function_name=register_function_name)
18 changes: 18 additions & 0 deletions brq/tools.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import contextlib
import functools
import inspect
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any, AsyncGenerator

import anyio
import redis.asyncio as redis
from anyio import to_thread

from brq.log import logger

Expand Down Expand Up @@ -75,3 +79,17 @@ async def get_redis_client(
yield redis_client
finally:
await redis_client.aclose()


def ensure_awaitable(func, limiter=None):
if inspect.iscoroutinefunction(func):
return func

@functools.wraps(func)
async def wrapper(*args, **kwargs):
nonlocal func
if kwargs:
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args, limiter=limiter)

return wrapper
5 changes: 5 additions & 0 deletions examples/decorator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Echo example

Use [start-redis.sh](../../dev/start-redis.sh) to start redis in docker.

And run `python producer.py` and `python consumer.py` in different terminals at the same time.
14 changes: 14 additions & 0 deletions examples/decorator/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from brq import task


@task
def echo(message):
print(f"Received message: {message}")


if __name__ == "__main__":
# Run the task once, for local debug
# echo("hello")

# Run as a daemon
echo.serve()
Loading

0 comments on commit 0df145c

Please sign in to comment.