diff --git a/brq/configs.py b/brq/configs.py index 63d4f41..6953fec 100644 --- a/brq/configs.py +++ b/brq/configs.py @@ -1,10 +1,8 @@ -import os import uuid from contextlib import asynccontextmanager from typing import Any, AsyncGenerator import redis.asyncio as redis -from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict from brq.tools import get_redis_client, get_redis_url diff --git a/brq/decorator.py b/brq/decorator.py index 334ee14..8a835c6 100644 --- a/brq/decorator.py +++ b/brq/decorator.py @@ -65,7 +65,75 @@ def task( *, config: BrqConfig | None = None, register_function_name: str | None = None, + # Redis connection + redis_host: str | None = None, + redis_port: int | None = None, + redis_db: int | None = None, + redis_cluster: bool | None = None, + redis_tls: bool | None = None, + redis_username: str | None = None, + redis_password: str | None = None, + # Scope + redis_key_prefix: str | None = None, + redis_key_seperator: str | None = None, + # Consumer + group_name: str | None = None, + consumer_identifier: str | None = None, + count_per_fetch: int | None = None, + block_time: int | None = None, + expire_time: int | None = None, + process_timeout: int | None = None, + retry_lock_time: int | None = None, + retry_cooldown_time: int | None = None, + enable_enque_deferred_job: bool | None = None, + enable_reprocess_timeout_job: bool | None = None, + enable_dead_queue: bool | None = None, + max_message_len: int | None = None, + delete_message_after_process: bool | None = None, + run_parallel: bool | None = None, + # Daemon + daemon_concurrency: int | None = None, ): + + def _wrapper( + func, + config: BrqConfig | None = None, + register_function_name: str | None = None, + ): + + return BrqTaskWrapper(func, config, register_function_name) + + kwargs = { + k: v + for k, v in { + "redis_host": redis_host, + "redis_port": redis_port, + "redis_db": redis_db, + "redis_cluster": redis_cluster, + "redis_tls": redis_tls, + "redis_username": redis_username, + "redis_password": redis_password, + "redis_key_prefix": redis_key_prefix, + "redis_key_seperator": redis_key_seperator, + "consumer_group_name": group_name, + "consumer_identifier": consumer_identifier, + "consumer_count_per_fetch": count_per_fetch, + "consumer_block_time": block_time, + "consumer_expire_time": expire_time, + "consumer_process_timeout": process_timeout, + "consumer_retry_lock_time": retry_lock_time, + "consumer_retry_cooldown_time": retry_cooldown_time, + "consumer_enable_enque_deferred_job": enable_enque_deferred_job, + "consumer_enable_reprocess_timeout_job": enable_reprocess_timeout_job, + "consumer_enable_dead_queue": enable_dead_queue, + "consumer_max_message_len": max_message_len, + "consumer_delete_message_after_process": delete_message_after_process, + "consumer_run_parallel": run_parallel, + "daemon_concurrency": daemon_concurrency, + }.items() + if v is not None + } + if not config: logger.info("Initializing config from environment variables and .env file") config = BrqConfig() @@ -73,17 +141,18 @@ def task( logger.info("Using custom config") config = config - def _wrapper(func, config, register_function_name): - if not config: - logger.info("Initializing config from environment variables and .env file") - config = BrqConfig() - else: - logger.info("Using custom config") - config = config - - return BrqTaskWrapper(func, config, register_function_name) + config = BrqConfig.model_validate( + { + **config.model_dump(), + **kwargs, + } + ) if _func is None: - return _wrapper + return partial( + _wrapper, + config=config, + register_function_name=register_function_name, + ) else: return _wrapper(_func, config=config, register_function_name=register_function_name)