Skip to content

Commit 35828a0

Browse files
authored
Per-shard statistics (#57)
* per shard stats * aight * cleaner * fix show lists * comments * more friendly * case-insensitive * test all shards * ok * HUH?
1 parent 1e8fa11 commit 35828a0

12 files changed

+285
-141
lines changed

.circleci/run_tests.sh

+6-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ function start_pgcat() {
1313

1414
# Setup the database with shards and user
1515
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
16+
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
17+
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
18+
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
1619

1720
# Install Toxiproxy to simulate a downed/slow database
1821
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
@@ -28,9 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
2831
start_pgcat "info"
2932

3033
# pgbench test
31-
pgbench -i -h 127.0.0.1 -p 6432 && \
32-
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \
33-
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
34+
pgbench -i -h 127.0.0.1 -p 6432
35+
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
36+
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
3437

3538
# COPY TO STDOUT test
3639
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null

Cargo.lock

-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ serde_derive = "1"
2020
regex = "1"
2121
num_cpus = "1"
2222
once_cell = "1"
23-
statsd = "0.15"
2423
sqlparser = "0.14"
2524
log = "0.4"
2625
arc-swap = "1"

src/admin.rs

+55-20
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub async fn handle_admin(
3131

3232
if query.starts_with("SHOW STATS") {
3333
trace!("SHOW STATS");
34-
show_stats(stream).await
34+
show_stats(stream, &pool).await
3535
} else if query.starts_with("RELOAD") {
3636
trace!("RELOAD");
3737
reload(stream).await
@@ -77,23 +77,39 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
7777
])); // but admin tools that work with pgbouncer want this
7878
res.put(data_row(&vec![
7979
"free_clients".to_string(),
80-
stats["cl_idle"].to_string(),
80+
stats
81+
.keys()
82+
.map(|address_id| stats[&address_id]["cl_idle"])
83+
.sum::<i64>()
84+
.to_string(),
8185
]));
8286
res.put(data_row(&vec![
8387
"used_clients".to_string(),
84-
stats["cl_active"].to_string(),
88+
stats
89+
.keys()
90+
.map(|address_id| stats[&address_id]["cl_active"])
91+
.sum::<i64>()
92+
.to_string(),
8593
]));
8694
res.put(data_row(&vec![
8795
"login_clients".to_string(),
8896
"0".to_string(),
8997
]));
9098
res.put(data_row(&vec![
9199
"free_servers".to_string(),
92-
stats["sv_idle"].to_string(),
100+
stats
101+
.keys()
102+
.map(|address_id| stats[&address_id]["sv_idle"])
103+
.sum::<i64>()
104+
.to_string(),
93105
]));
94106
res.put(data_row(&vec![
95107
"used_servers".to_string(),
96-
stats["sv_active"].to_string(),
108+
stats
109+
.keys()
110+
.map(|address_id| stats[&address_id]["sv_active"])
111+
.sum::<i64>()
112+
.to_string(),
97113
]));
98114
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
99115
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
@@ -125,7 +141,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
125141
}
126142

127143
/// SHOW POOLS
128-
async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> {
144+
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
129145
let stats = get_stats();
130146
let config = {
131147
let guard = get_config();
@@ -151,16 +167,26 @@ async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Resu
151167
let mut res = BytesMut::new();
152168
res.put(row_description(&columns));
153169

154-
let mut row = vec![String::from("all"), config.user.name.clone()];
170+
for shard in 0..pool.shards() {
171+
for server in 0..pool.servers(shard) {
172+
let address = pool.address(shard, server);
173+
let stats = match stats.get(&address.id) {
174+
Some(stats) => stats.clone(),
175+
None => HashMap::new(),
176+
};
155177

156-
for column in &columns[2..columns.len() - 1] {
157-
let value = stats.get(column.0).unwrap_or(&0).to_string();
158-
row.push(value);
159-
}
178+
let mut row = vec![address.name(), config.user.name.clone()];
160179

161-
row.push(config.general.pool_mode.to_string());
180+
for column in &columns[2..columns.len() - 1] {
181+
let value = stats.get(column.0).unwrap_or(&0).to_string();
182+
row.push(value);
183+
}
184+
185+
row.push(config.general.pool_mode.to_string());
186+
res.put(data_row(&row));
187+
}
188+
}
162189

163-
res.put(data_row(&row));
164190
res.put(command_complete("SHOW"));
165191

166192
res.put_u8(b'Z');
@@ -309,7 +335,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
309335
}
310336

311337
/// SHOW STATS
312-
async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
338+
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
313339
let columns = vec![
314340
("database", DataType::Text),
315341
("total_xact_count", DataType::Numeric),
@@ -332,15 +358,24 @@ async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
332358
let mut res = BytesMut::new();
333359
res.put(row_description(&columns));
334360

335-
let mut row = vec![
336-
String::from("all"), // TODO: per-database stats,
337-
];
361+
for shard in 0..pool.shards() {
362+
for server in 0..pool.servers(shard) {
363+
let address = pool.address(shard, server);
364+
let stats = match stats.get(&address.id) {
365+
Some(stats) => stats.clone(),
366+
None => HashMap::new(),
367+
};
338368

339-
for column in &columns[1..] {
340-
row.push(stats.get(column.0).unwrap_or(&0).to_string());
369+
let mut row = vec![address.name()];
370+
371+
for column in &columns[1..] {
372+
row.push(stats.get(column.0).unwrap_or(&0).to_string());
373+
}
374+
375+
res.put(data_row(&row));
376+
}
341377
}
342378

343-
res.put(data_row(&row));
344379
res.put(command_complete("SHOW"));
345380

346381
res.put_u8(b'Z');

src/client.rs

+43-19
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ pub struct Client {
5858

5959
// Clients want to talk to admin
6060
admin: bool,
61+
62+
// Last address the client talked to
63+
last_address_id: Option<usize>,
64+
65+
// Last server process id we talked to
66+
last_server_id: Option<i32>,
6167
}
6268

6369
impl Client {
@@ -147,6 +153,8 @@ impl Client {
147153
parameters: parameters,
148154
stats: stats,
149155
admin: admin,
156+
last_address_id: None,
157+
last_server_id: None,
150158
});
151159
}
152160

@@ -169,6 +177,8 @@ impl Client {
169177
parameters: HashMap::new(),
170178
stats: stats,
171179
admin: false,
180+
last_address_id: None,
181+
last_server_id: None,
172182
});
173183
}
174184

@@ -219,9 +229,6 @@ impl Client {
219229
loop {
220230
trace!("Client idle, waiting for message");
221231

222-
// Client idle, waiting for messages.
223-
self.stats.client_idle(self.process_id);
224-
225232
// Read a complete message from the client, which normally would be
226233
// either a `Q` (query) or `P` (prepare, extended protocol).
227234
// We can parse it here before grabbing a server from the pool,
@@ -292,13 +299,13 @@ impl Client {
292299
continue;
293300
}
294301

295-
// Waiting for server connection.
296-
self.stats.client_waiting(self.process_id);
297-
298302
debug!("Waiting for connection from pool");
299303

300304
// Grab a server from the pool: the client issued a regular query.
301-
let connection = match pool.get(query_router.shard(), query_router.role()).await {
305+
let connection = match pool
306+
.get(query_router.shard(), query_router.role(), self.process_id)
307+
.await
308+
{
302309
Ok(conn) => {
303310
debug!("Got connection from pool");
304311
conn
@@ -312,15 +319,23 @@ impl Client {
312319
};
313320

314321
let mut reference = connection.0;
315-
let _address = connection.1;
322+
let address = connection.1;
316323
let server = &mut *reference;
317324

318325
// Claim this server as mine for query cancellation.
319326
server.claim(self.process_id, self.secret_key);
320327

328+
// "disconnect" from the previous server stats-wise
329+
if let Some(last_address_id) = self.last_address_id {
330+
self.stats
331+
.client_disconnecting(self.process_id, last_address_id);
332+
}
333+
321334
// Client active & server active
322-
self.stats.client_active(self.process_id);
323-
self.stats.server_active(server.process_id());
335+
self.stats.client_active(self.process_id, address.id);
336+
self.stats.server_active(server.process_id(), address.id);
337+
self.last_address_id = Some(address.id);
338+
self.last_server_id = Some(server.process_id());
324339

325340
debug!(
326341
"Client {:?} talking to server {:?}",
@@ -392,17 +407,17 @@ impl Client {
392407
}
393408

394409
// Report query executed statistics.
395-
self.stats.query();
410+
self.stats.query(address.id);
396411

397412
// The transaction is over, we can release the connection back to the pool.
398413
if !server.in_transaction() {
399414
// Report transaction executed statistics.
400-
self.stats.transaction();
415+
self.stats.transaction(address.id);
401416

402417
// Release server back to the pool if we are in transaction mode.
403418
// If we are in session mode, we keep the server until the client disconnects.
404419
if self.transaction_mode {
405-
self.stats.server_idle(server.process_id());
420+
self.stats.server_idle(server.process_id(), address.id);
406421
break;
407422
}
408423
}
@@ -478,15 +493,15 @@ impl Client {
478493
}
479494

480495
// Report query executed statistics.
481-
self.stats.query();
496+
self.stats.query(address.id);
482497

483498
// Release server back to the pool if we are in transaction mode.
484499
// If we are in session mode, we keep the server until the client disconnects.
485500
if !server.in_transaction() {
486-
self.stats.transaction();
501+
self.stats.transaction(address.id);
487502

488503
if self.transaction_mode {
489-
self.stats.server_idle(server.process_id());
504+
self.stats.server_idle(server.process_id(), address.id);
490505
break;
491506
}
492507
}
@@ -517,10 +532,10 @@ impl Client {
517532
// Release server back to the pool if we are in transaction mode.
518533
// If we are in session mode, we keep the server until the client disconnects.
519534
if !server.in_transaction() {
520-
self.stats.transaction();
535+
self.stats.transaction(address.id);
521536

522537
if self.transaction_mode {
523-
self.stats.server_idle(server.process_id());
538+
self.stats.server_idle(server.process_id(), address.id);
524539
break;
525540
}
526541
}
@@ -537,6 +552,7 @@ impl Client {
537552
// The server is no longer bound to us, we can't cancel it's queries anymore.
538553
debug!("Releasing server back into the pool");
539554
self.release();
555+
self.stats.client_idle(self.process_id, address.id);
540556
}
541557
}
542558

@@ -549,6 +565,14 @@ impl Client {
549565

550566
impl Drop for Client {
551567
fn drop(&mut self) {
552-
self.stats.client_disconnecting(self.process_id);
568+
// Disconnect the client
569+
if let Some(address_id) = self.last_address_id {
570+
self.stats.client_disconnecting(self.process_id, address_id);
571+
572+
// The server is now idle
573+
if let Some(process_id) = self.last_server_id {
574+
self.stats.server_idle(process_id, address_id);
575+
}
576+
}
553577
}
554578
}

src/config.rs

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ impl PartialEq<Role> for Option<Role> {
4848

4949
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
5050
pub struct Address {
51+
pub id: usize,
5152
pub host: String,
5253
pub port: String,
5354
pub shard: usize,
@@ -58,6 +59,7 @@ pub struct Address {
5859
impl Default for Address {
5960
fn default() -> Address {
6061
Address {
62+
id: 0,
6163
host: String::from("127.0.0.1"),
6264
port: String::from("5432"),
6365
shard: 0,

0 commit comments

Comments
 (0)