From 3693fe83ed79cf8b96ffafee21ffc843fff5c2f0 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 21 Jan 2025 20:08:01 +0100 Subject: [PATCH 1/2] ref(consumer): Remove async inserts This was an experiment that is no longer needed. In the future one could use writer-options as in https://github.com/getsentry/snuba/pull/6788 blocked on https://github.com/getsentry/ops/pull/13690 where we remove the usage of those args --- rust_snuba/benches/processors.rs | 1 - rust_snuba/src/consumer.rs | 4 ---- rust_snuba/src/factory.rs | 2 -- rust_snuba/src/strategies/clickhouse/batch.rs | 9 -------- snuba/cli/rust_consumer.py | 22 ++----------------- 5 files changed, 2 insertions(+), 36 deletions(-) diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 58cf9a75fca..9ed424fa84e 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -82,7 +82,6 @@ fn create_factory( clickhouse_concurrency, commitlog_concurrency, replacements_concurrency, - async_inserts: false, python_max_queue_depth: None, use_rust_processor: true, health_check_file: None, diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 9da36d0ff22..e95227753dd 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -39,7 +39,6 @@ pub fn consumer( use_rust_processor: bool, enforce_schema: bool, max_poll_interval_ms: usize, - async_inserts: bool, mutations_mode: bool, python_max_queue_depth: Option, health_check_file: Option<&str>, @@ -58,7 +57,6 @@ pub fn consumer( use_rust_processor, enforce_schema, max_poll_interval_ms, - async_inserts, python_max_queue_depth, health_check_file, stop_at_timestamp, @@ -80,7 +78,6 @@ pub fn consumer_impl( use_rust_processor: bool, enforce_schema: bool, max_poll_interval_ms: usize, - async_inserts: bool, python_max_queue_depth: Option, health_check_file: Option<&str>, stop_at_timestamp: Option, @@ -263,7 +260,6 @@ pub fn consumer_impl( clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), commitlog_concurrency: ConcurrencyConfig::new(2), replacements_concurrency: ConcurrencyConfig::new(4), - async_inserts, python_max_queue_depth, use_rust_processor, health_check_file: health_check_file.map(ToOwned::to_owned), diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 0ac543bc1fe..43a75808ea5 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -45,7 +45,6 @@ pub struct ConsumerStrategyFactory { pub clickhouse_concurrency: ConcurrencyConfig, pub commitlog_concurrency: ConcurrencyConfig, pub replacements_concurrency: ConcurrencyConfig, - pub async_inserts: bool, pub python_max_queue_depth: Option, pub use_rust_processor: bool, pub health_check_file: Option, @@ -115,7 +114,6 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { &self.clickhouse_concurrency, &self.storage_config.clickhouse_cluster.user, &self.storage_config.clickhouse_cluster.password, - self.async_inserts, self.batch_write_timeout, ); diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index b43e5acc495..c14dc59c4b1 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -9,7 +9,6 @@ use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; use tokio_stream::wrappers::ReceiverStream; -use crate::runtime_config::get_str_config; use crate::types::RowData; const CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES: usize = 1_000_000; @@ -41,7 +40,6 @@ impl BatchFactory { concurrency: &ConcurrencyConfig, clickhouse_user: &str, clickhouse_password: &str, - async_inserts: bool, batch_write_timeout: Option, ) -> Self { let mut headers = HeaderMap::with_capacity(5); @@ -63,13 +61,6 @@ impl BatchFactory { let mut query_params = String::new(); query_params.push_str("load_balancing=in_order&insert_distributed_sync=1"); - if async_inserts { - let async_inserts_allowed = get_str_config("async_inserts_allowed").ok().flatten(); - if async_inserts_allowed == Some("1".to_string()) { - query_params.push_str("&async_insert=1&wait_for_async_insert=1"); - } - } - let url = format!("http://{hostname}:{http_port}?{query_params}"); let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 3bdc74a1557..affad805ea6 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -105,11 +105,6 @@ "--concurrency", type=int, ) -@click.option( - "--clickhouse-concurrency", - type=int, - help="Number of concurrent clickhouse batches at one time.", -) @click.option( "--use-rust-processor/--use-python-processor", "use_rust_processor", @@ -134,12 +129,6 @@ type=int, default=30000, ) -@click.option( - "--async-inserts", - is_flag=True, - default=False, - help="Enable async inserts for ClickHouse", -) @click.option( "--mutations-mode", is_flag=True, @@ -197,11 +186,9 @@ def rust_consumer( max_batch_time_ms: int, log_level: str, concurrency: Optional[int], - clickhouse_concurrency: Optional[int], use_rust_processor: bool, group_instance_id: Optional[str], max_poll_interval_ms: int, - async_inserts: bool, python_max_queue_depth: Optional[int], health_check_file: Optional[str], enforce_schema: bool, @@ -238,11 +225,7 @@ def rust_consumer( os.environ["RUST_LOG"] = log_level.lower() - if not async_inserts: - # we don't want to allow increasing this if - # we aren't using async inserts since that will increase - # the number of inserts/sec on clickhouse - clickhouse_concurrency = 2 + clickhouse_concurrency = 2 exitcode = rust_snuba.consumer( # type: ignore consumer_group, @@ -250,11 +233,10 @@ def rust_consumer( no_strict_offset_reset, consumer_config_raw, concurrency or 1, - clickhouse_concurrency or 2, + clickhouse_concurrency, use_rust_processor, enforce_schema, max_poll_interval_ms, - async_inserts, mutations_mode, python_max_queue_depth, health_check_file, From 83d01c695902bef05f357a4ef15dce44dfd0328a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 21 Jan 2025 21:41:54 +0100 Subject: [PATCH 2/2] remove unused arg --- rust_snuba/src/strategies/clickhouse/batch.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index c14dc59c4b1..8a39ef589de 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -257,7 +257,6 @@ mod tests { &concurrency, "default", "", - false, None, ); @@ -292,7 +291,6 @@ mod tests { &concurrency, "default", "", - true, None, ); @@ -326,7 +324,6 @@ mod tests { &concurrency, "default", "", - false, None, ); @@ -358,7 +355,6 @@ mod tests { &concurrency, "default", "", - false, None, ); @@ -392,7 +388,6 @@ mod tests { &concurrency, "default", "", - true, // pass in an unreasonably short timeout // which prevents the client request from reaching Clickhouse Some(Duration::from_millis(0)), @@ -427,7 +422,6 @@ mod tests { &concurrency, "default", "", - true, // pass in a reasonable timeout Some(Duration::from_millis(1000)), );