Skip to content

Commit f5116c1

Browse files
authoredNov 15, 2024··
feat(consumers): rust consumers quantized rebalance (#6561)
Add the ability to do quantized rebalancing for consumers, toggleable by runtime config. ### What is quantized rebalancing? Rebalancing events are synchronized to the tick of the clock. Controlled by the runtime config: `quantized_rebalance_consumer_group_delay_secs__{consumer_group}` Let's say: `SET quantized_rebalance_consumer_group_delay_secs__spans=15` * When a consumer starts up: delay subscribing to a topic until `timestamp % 15 == 0` * when a consumer gets a shutdown signal (ctrl-c): delay leaving the group until `timestamp % 15 == 0` ### What else is in this PR? This PR adds the ability to read from redis config directly. This was mostly done because I couldn't get the python bindings to work on my machine, it will still fall back to the old implementation if there is an error in the direct implementation
1 parent 83aa790 commit f5116c1

File tree

6 files changed

+203
-4
lines changed

6 files changed

+203
-4
lines changed
 

‎rust_snuba/Cargo.lock

+40
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎rust_snuba/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ data-encoding = "2.5.0"
5454
zstd = "0.12.3"
5555
serde_with = "3.8.1"
5656
seq-macro = "0.3"
57+
redis = "0.27.5"
5758

5859

5960
[patch.crates-io]

‎rust_snuba/src/consumer.rs

+21-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::metrics::global_tags::set_global_tag;
2323
use crate::metrics::statsd::StatsDBackend;
2424
use crate::mutations::factory::MutConsumerStrategyFactory;
2525
use crate::processors;
26+
use crate::rebalancing;
2627
use crate::types::{InsertOrReplacement, KafkaMessageMetadata};
2728

2829
#[pyfunction]
@@ -234,6 +235,11 @@ pub fn consumer_impl(
234235

235236
let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
236237

238+
let rebalance_delay_secs = rebalancing::get_rebalance_delay_secs(consumer_group);
239+
if let Some(secs) = rebalance_delay_secs {
240+
rebalancing::delay_kafka_rebalance(secs)
241+
}
242+
237243
let processor = if mutations_mode {
238244
let mut_factory = MutConsumerStrategyFactory {
239245
storage_config: first_storage,
@@ -286,10 +292,21 @@ pub fn consumer_impl(
286292

287293
let mut handle = processor.get_handle();
288294

289-
ctrlc::set_handler(move || {
290-
handle.signal_shutdown();
291-
})
292-
.expect("Error setting Ctrl-C handler");
295+
match rebalance_delay_secs {
296+
Some(secs) => {
297+
ctrlc::set_handler(move || {
298+
rebalancing::delay_kafka_rebalance(secs);
299+
handle.signal_shutdown();
300+
})
301+
.expect("Error setting Ctrl-C handler");
302+
}
303+
None => {
304+
ctrlc::set_handler(move || {
305+
handle.signal_shutdown();
306+
})
307+
.expect("Error setting Ctrl-C handler");
308+
}
309+
}
293310

294311
if let Err(error) = processor.run() {
295312
let error: &dyn std::error::Error = &error;

‎rust_snuba/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod logging;
66
mod metrics;
77
mod mutations;
88
mod processors;
9+
mod rebalancing;
910
mod runtime_config;
1011
mod strategies;
1112
mod types;

‎rust_snuba/src/rebalancing.rs

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use crate::runtime_config;
2+
use std::thread;
3+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
4+
5+
pub fn delay_kafka_rebalance(configured_delay_secs: u64) {
6+
/*
7+
* Introduces a configurable delay to the consumer topic
8+
* subscription and consumer shutdown steps (handled by the
9+
* StreamProcessor). The idea behind is that by forcing
10+
* these steps to occur at certain time "ticks" (for example, at
11+
* every 15 second tick in a minute), we can reduce the number of
12+
* rebalances that are triggered during a deploy. This means
13+
* fewer "stop the world rebalancing" occurrences and more time
14+
* for the consumer group to stabilize and make progress.
15+
*/
16+
let current_time = SystemTime::now();
17+
let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) {
18+
Ok(duration) => duration.as_secs(),
19+
Err(_) => 0,
20+
} % configured_delay_secs;
21+
tracing::info!(
22+
"Delaying rebalance by {} seconds",
23+
configured_delay_secs - time_elapsed_in_slot
24+
);
25+
26+
thread::sleep(Duration::from_secs(
27+
configured_delay_secs - time_elapsed_in_slot,
28+
));
29+
}
30+
31+
pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option<u64> {
32+
match runtime_config::get_str_config(
33+
format!(
34+
"quantized_rebalance_consumer_group_delay_secs__{}",
35+
consumer_group
36+
)
37+
.as_str(),
38+
) {
39+
Ok(delay_secs) => match delay_secs {
40+
Some(secs) => match secs.parse() {
41+
Ok(v) => Some(v),
42+
Err(_) => None,
43+
},
44+
None => None,
45+
},
46+
Err(_) => None,
47+
}
48+
}
49+
50+
#[cfg(test)]
51+
mod tests {
52+
use super::*;
53+
54+
#[test]
55+
fn test_delay_config() {
56+
runtime_config::del_str_config_direct(
57+
"quantized_rebalance_consumer_group_delay_secs__spans",
58+
)
59+
.unwrap();
60+
let delay_secs = get_rebalance_delay_secs("spans");
61+
assert_eq!(delay_secs, None);
62+
runtime_config::set_str_config_direct(
63+
"quantized_rebalance_consumer_group_delay_secs__spans",
64+
"420",
65+
)
66+
.unwrap();
67+
let delay_secs = get_rebalance_delay_secs("spans");
68+
assert_eq!(delay_secs, Some(420));
69+
runtime_config::set_str_config_direct(
70+
"quantized_rebalance_consumer_group_delay_secs__spans",
71+
"garbage",
72+
)
73+
.unwrap();
74+
let delay_secs = get_rebalance_delay_secs("spans");
75+
assert_eq!(delay_secs, None);
76+
runtime_config::del_str_config_direct(
77+
"quantized_rebalance_consumer_group_delay_secs__spans",
78+
)
79+
.unwrap();
80+
}
81+
}

‎rust_snuba/src/runtime_config.rs

+59
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,77 @@
11
use anyhow::Error;
22
use parking_lot::RwLock;
33
use pyo3::prelude::{PyModule, Python};
4+
use redis::Commands;
45
use std::collections::BTreeMap;
6+
use std::collections::HashMap;
57
use std::time::Duration;
68

79
use rust_arroyo::timer;
810
use rust_arroyo::utils::timing::Deadline;
911

1012
static CONFIG: RwLock<BTreeMap<String, (Option<String>, Deadline)>> = RwLock::new(BTreeMap::new());
1113

14+
static CONFIG_HASHSET_KEY: &str = "snuba-config";
15+
16+
fn get_redis_client() -> Result<redis::Client, redis::RedisError> {
17+
let redis_host = std::env::var("REDIS_HOST").unwrap_or(String::from("127.0.0.1"));
18+
let redis_port = std::env::var("REDIS_PORT").unwrap_or(String::from("6379"));
19+
let redis_password = std::env::var("REDIS_PASSWORD").unwrap_or(String::from(""));
20+
let redis_db = std::env::var("REDIS_DB").unwrap_or(String::from("1"));
21+
// TODO: handle SSL?
22+
let url = format!(
23+
"redis://{}:{}@{}:{}/{}",
24+
"default", redis_password, redis_host, redis_port, redis_db
25+
);
26+
redis::Client::open(url)
27+
}
28+
29+
fn get_str_config_direct(key: &str) -> Result<Option<String>, Error> {
30+
let deadline = Deadline::new(Duration::from_secs(10));
31+
32+
let client = get_redis_client()?;
33+
let mut con = client.get_connection()?;
34+
35+
let configmap: HashMap<String, String> = con.hgetall(CONFIG_HASHSET_KEY)?;
36+
let val = match configmap.get(key) {
37+
Some(val) => Some(val.clone()),
38+
None => return Ok(None),
39+
};
40+
41+
CONFIG
42+
.write()
43+
.insert(key.to_string(), (val.clone(), deadline));
44+
Ok(CONFIG.read().get(key).unwrap().0.clone())
45+
}
46+
47+
#[allow(dead_code)]
48+
pub fn set_str_config_direct(key: &str, val: &str) -> Result<(), Error> {
49+
let client = get_redis_client()?;
50+
let mut con = client.get_connection()?;
51+
con.hset(CONFIG_HASHSET_KEY, key, val)?;
52+
Ok(())
53+
}
54+
55+
#[allow(dead_code)]
56+
pub fn del_str_config_direct(key: &str) -> Result<(), Error> {
57+
let client = get_redis_client()?;
58+
let mut con = client.get_connection()?;
59+
con.hdel(CONFIG_HASHSET_KEY, key)?;
60+
Ok(())
61+
}
62+
1263
/// Runtime config is cached for 10 seconds
1364
pub fn get_str_config(key: &str) -> Result<Option<String>, Error> {
1465
let deadline = Deadline::new(Duration::from_secs(10));
1566

67+
match get_str_config_direct(key) {
68+
Ok(val) => return Ok(val),
69+
Err(error) => tracing::error!(
70+
"Could not get config from redis directly, falling back to python {}",
71+
error
72+
),
73+
}
74+
1675
if let Some(value) = CONFIG.read().get(key) {
1776
let (config, deadline) = value;
1877
if !deadline.has_elapsed() {

0 commit comments

Comments
 (0)
Please sign in to comment.