Skip to content

Commit 3d33ccf

Browse files
authored
Fix maxwait metric (#183)
Max wait was being reported as 0 after #159 This PR fixes that and adds test
1 parent 7987c5f commit 3d33ccf

File tree

4 files changed

+55
-23
lines changed

4 files changed

+55
-23
lines changed

src/admin.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,13 @@ where
228228
pool_config.pool_mode.to_string(),
229229
];
230230
for column in &columns[3..columns.len()] {
231-
let value = pool_stats.get(column.0).unwrap_or(&0).to_string();
231+
let value = match column.0 {
232+
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
233+
"maxwait_us" => {
234+
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
235+
}
236+
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
237+
};
232238
row.push(value);
233239
}
234240
res.put(data_row(&row));

src/pool.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ impl ConnectionPool {
333333
role: Option<Role>, // primary or replica
334334
process_id: i32, // client id
335335
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
336-
let now = Instant::now();
337336
let mut candidates: Vec<&Address> = self.addresses[shard]
338337
.iter()
339338
.filter(|address| address.role == role)
@@ -358,6 +357,7 @@ impl ConnectionPool {
358357
}
359358

360359
// Indicate we're waiting on a server connection from a pool.
360+
let now = Instant::now();
361361
self.stats.client_waiting(process_id);
362362

363363
// Check if we can connect
@@ -397,7 +397,7 @@ impl ConnectionPool {
397397

398398
match tokio::time::timeout(
399399
tokio::time::Duration::from_millis(healthcheck_timeout),
400-
server.query(";"), // Cheap query (query parser not used in PG)
400+
server.query(";"), // Cheap query as it skips the query planner
401401
)
402402
.await
403403
{

src/stats.rs

+24-20
Original file line numberDiff line numberDiff line change
@@ -580,15 +580,15 @@ impl Collector {
580580
server_info.query_count += stat.value as u64;
581581
server_info.application_name = app_name;
582582

583-
let pool_stats = address_stat_lookup
583+
let address_stats = address_stat_lookup
584584
.entry(server_info.address_id)
585585
.or_insert(HashMap::default());
586-
let counter = pool_stats
586+
let counter = address_stats
587587
.entry("total_query_count".to_string())
588588
.or_insert(0);
589589
*counter += stat.value;
590590

591-
let duration = pool_stats
591+
let duration = address_stats
592592
.entry("total_query_time".to_string())
593593
.or_insert(0);
594594
*duration += duration_ms as i64;
@@ -681,26 +681,26 @@ impl Collector {
681681
Some(server_info) => {
682682
server_info.application_name = app_name;
683683

684-
let pool_stats = address_stat_lookup
684+
let address_stats = address_stat_lookup
685685
.entry(server_info.address_id)
686686
.or_insert(HashMap::default());
687-
let counter =
688-
pool_stats.entry("total_wait_time".to_string()).or_insert(0);
687+
let counter = address_stats
688+
.entry("total_wait_time".to_string())
689+
.or_insert(0);
689690
*counter += stat.value;
690691

691-
let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0);
692-
let mic_part = stat.value % 1_000_000;
693-
694-
// Report max time here
695-
if mic_part > *counter {
696-
*counter = mic_part;
697-
}
698-
699-
let counter = pool_stats.entry("maxwait".to_string()).or_insert(0);
700-
let seconds = *counter / 1_000_000;
692+
let pool_stats = pool_stat_lookup
693+
.entry((
694+
server_info.pool_name.clone(),
695+
server_info.username.clone(),
696+
))
697+
.or_insert(HashMap::default());
701698

702-
if seconds > *counter {
703-
*counter = seconds;
699+
// We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin
700+
let old_microseconds =
701+
pool_stats.entry("maxwait_us".to_string()).or_insert(0);
702+
if stat.value > *old_microseconds {
703+
*old_microseconds = stat.value;
704704
}
705705
}
706706
None => (),
@@ -903,8 +903,6 @@ impl Collector {
903903
"sv_active",
904904
"sv_tested",
905905
"sv_login",
906-
"maxwait",
907-
"maxwait_us",
908906
] {
909907
pool_stats.insert(stat.to_string(), 0);
910908
}
@@ -962,6 +960,12 @@ impl Collector {
962960
LATEST_CLIENT_STATS.store(Arc::new(client_states.clone()));
963961
LATEST_SERVER_STATS.store(Arc::new(server_states.clone()));
964962
LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone()));
963+
964+
// Clear maxwait after reporting
965+
pool_stat_lookup
966+
.entry((pool_name.clone(), username.clone()))
967+
.or_insert(HashMap::default())
968+
.insert("maxwait_us".to_string(), 0);
965969
}
966970

967971
EventName::UpdateAverages { address_id } => {

tests/ruby/admin_spec.rb

+22
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,28 @@
208208
threads.map(&:join)
209209
connections.map(&:close)
210210
end
211+
212+
it "show correct max_wait" do
213+
threads = []
214+
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
215+
connections.each do |c|
216+
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
217+
end
218+
219+
sleep(2.5) # Allow time for stats to update
220+
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
221+
results = admin_conn.async_exec("SHOW POOLS")[0]
222+
223+
expect(results["maxwait"]).to eq("1")
224+
expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000)
225+
226+
sleep(4.5) # Allow time for stats to update
227+
results = admin_conn.async_exec("SHOW POOLS")[0]
228+
expect(results["maxwait"]).to eq("0")
229+
230+
threads.map(&:join)
231+
connections.map(&:close)
232+
end
211233
end
212234
end
213235

0 commit comments

Comments
 (0)