Skip to content

Commit 8f6fa26

Browse files
committed
refactor(server): improve statement timeout and parameter handling
- Add statement timeout configuration at pool level (pools.<pool_name>.statement_timeout) - Add logging for statement timeout configuration during pool initialization - Improve sync_parameters method to handle statement timeout consistently - Document statement timeout configuration in CONFIG.md - Change string initialization from String::from() to String::new() - Maintain cleanup state until connection check-in for proper connection pooling The statement timeout setting leverages PostgreSQL's native statement_timeout parameter, allowing the database to handle query termination automatically. This ensures reliable timeout enforcement directly at the database level, rather than managing timeouts in the connection pooler. Pool-level settings take precedence over user-level settings for consistent timeout enforcement across all pool users.
1 parent f8e2fcd commit 8f6fa26

File tree

4 files changed

+48
-6
lines changed

4 files changed

+48
-6
lines changed

CONFIG.md

+10
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,16 @@ default: 3000
458458

459459
Connect timeout can be overwritten in the pool
460460

461+
### statement_timeout
462+
463+
```
464+
path: pools.<pool_name>.statement_timeout
465+
default: 0
466+
```
467+
468+
Statement Timeout.
469+
The `statement_timeout` setting controls the maximum amount of time (in milliseconds) that any query is allowed to run before being cancelled. When set, this allows Postgres to manage the statement_timeout. [See Postgres Docs](https://www.postgresql.org/docs/13/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-STATEMENT)
470+
461471
## `pools.<pool_name>.users.<user_index>` Section
462472

463473
### username

examples/docker/pgcat.toml

+3
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ primary_reads_enabled = true
9090
#
9191
sharding_function = "pg_bigint_hash"
9292

93+
# Maximum query duration in milliseconds. This value is relayed to Postgres for managing query timeouts.
94+
statement_timeout = 30000 # 30 seconds
95+
9396
# Credentials for users that may connect to this cluster
9497
[pools.postgres.users.0]
9598
username = "postgres"

src/config.rs

+18
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,9 @@ pub struct Pool {
541541
#[serde(default = "Pool::default_default_role")]
542542
pub default_role: String,
543543

544+
#[serde(default = "Pool::default_statement_timeout")]
545+
pub statement_timeout: u64,
546+
544547
#[serde(default)] // False
545548
pub query_parser_enabled: bool,
546549

@@ -674,6 +677,10 @@ impl Pool {
674677
50
675678
}
676679

680+
pub fn default_statement_timeout() -> u64 {
681+
0 // Default to no timeout
682+
}
683+
677684
pub fn validate(&mut self) -> Result<(), Error> {
678685
match self.default_role.as_ref() {
679686
"any" => (),
@@ -783,6 +790,7 @@ impl Default for Pool {
783790
pool_mode: Self::default_pool_mode(),
784791
load_balancing_mode: Self::default_load_balancing_mode(),
785792
default_role: String::from("any"),
793+
statement_timeout: Self::default_statement_timeout(),
786794
query_parser_enabled: false,
787795
query_parser_max_length: None,
788796
query_parser_read_write_splitting: false,
@@ -1276,6 +1284,16 @@ impl Config {
12761284
.sum::<u32>()
12771285
.to_string()
12781286
);
1287+
info!(
1288+
"[pool: {}] Statement timeout: {}{}",
1289+
pool_name,
1290+
pool_config.statement_timeout,
1291+
if pool_config.statement_timeout > 0 {
1292+
"ms"
1293+
} else {
1294+
""
1295+
}
1296+
);
12791297
info!(
12801298
"[pool: {}] Default pool mode: {}",
12811299
pool_name,

src/server.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -1259,18 +1259,29 @@ impl Server {
12591259
}
12601260

12611261
pub async fn sync_parameters(&mut self, parameters: &ServerParameters) -> Result<(), Error> {
1262-
let parameter_diff = self.server_parameters.compare_params(parameters);
1262+
let mut queries = Vec::new();
12631263

1264-
if parameter_diff.is_empty() {
1265-
return Ok(());
1264+
let config = get_config();
1265+
if let Some(pool) = config.pools.get(&self.address.database) {
1266+
if pool.statement_timeout > 0 {
1267+
queries.push(format!(
1268+
"SET statement_timeout TO {}",
1269+
pool.statement_timeout
1270+
));
1271+
self.cleanup_state.needs_cleanup_set = true;
1272+
}
12661273
}
12671274

1268-
let mut query = String::from("");
1269-
1275+
let parameter_diff = self.server_parameters.compare_params(parameters);
12701276
for (key, value) in parameter_diff {
1271-
query.push_str(&format!("SET {} TO '{}';", key, value));
1277+
queries.push(format!("SET {} TO '{}'", key, value));
1278+
}
1279+
1280+
if queries.is_empty() {
1281+
return Ok(()); // No changes needed
12721282
}
12731283

1284+
let query = queries.join("; ");
12741285
let res = self.query(&query).await;
12751286

12761287
self.cleanup_state.reset();

0 commit comments

Comments
 (0)