Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(drip): improve logging #3984

Merged
merged 6 commits into from
Mar 8, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 137 additions & 123 deletions drip/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use cosmos_client::{
use prost::{Message, Name};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tracing::{debug, error, info, info_span, warn};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing_subscriber::EnvFilter;
use unionlabs::{
primitives::{encoding::HexUnprefixed, H256},
Expand Down Expand Up @@ -105,143 +105,148 @@ async fn main() {
let config = config.clone();

for chain in config.chains.clone() {
let _chain_polling_span = info_span!("chain_polling", chain_id = chain.id).entered();
let chain_polling_span = info_span!("chain_polling", chain_id = chain.id);
let _span = chain_polling_span.enter();
info!("spawning worker for chain");
let pool = pool.clone();
tokio::spawn(async move {
loop {
let pool = pool.clone();

info!("spawning worker thread");
let chain = chain.clone();
let handle = tokio::spawn(async move {
// recreate each time so that if this task panics, the keyring gets rebuilt
// make sure to panic *here* so that the tokio task will catch the panic!
info!("creating chain client");
let chain_client = ChainClient::new(&chain).await;
info!("entering polling loop");
loop {
let chain = chain.clone();
let requests: Vec<SendRequest> = pool
.conn(move |conn| {
tokio::spawn(
async move {
loop {
let pool = pool.clone();

info!("spawning worker thread");
let chain = chain.clone();
let handle = tokio::spawn(async move {
// recreate each time so that if this task panics, the keyring gets rebuilt
// make sure to panic *here* so that the tokio task will catch the panic!
info!("creating chain client");
let chain_client = ChainClient::new(&chain).await;
info!("entering polling loop");
loop {
let chain = chain.clone();
let requests: Vec<SendRequest> = pool
.conn(move |conn| {
let mut stmt = conn
.prepare_cached(
"SELECT id, denom, address FROM requests
WHERE tx_hash IS NULL AND chain_id IS ?1 LIMIT ?2",
)
.expect("SQL statement is valid");

let mut rows = stmt
.query((&chain.id, batch_size as i64))
.expect("can't query rows");

let mut requests = vec![];

while let Some(row) = rows.next().expect("could not read row") {
let id: i64 = row.get(0).expect("could not read id");
let denom: String =
row.get(1).expect("could not read denom");
let receiver: String =
row.get(2).expect("could not read address");

let Some(coin) =
chain.coins.iter().find(|coin| coin.denom == denom)
else {
error!(
%denom,
"dropping request for unknown denom");
break;
};

requests.push(SendRequest {
id,
receiver,
denom,
amount: coin.amount,
});
}

Ok(requests)
})
.await
.expect("pool error");

if requests.is_empty() {
debug!("no requests in queue");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
let mut i = 0;

// try sending batch 5 times
let result = loop {
let send_res = chain_client.send(&requests).await;

match send_res {
Err(err) => {
if i >= 5 {
break format!("ERROR: {}", ErrorReporter(&*err));
}
warn!(
err = %ErrorReporter(&*err),
attempt = i,
"unable to submit transaction"
);
i += 1;
}
// this will be displayed to users, print the hash in the same way that cosmos sdk does
Ok(tx_hash) => {
break tx_hash
.into_encoding::<HexUnprefixed>()
.to_string()
.to_uppercase()
}
};
};

pool.conn(move |conn| {
debug!("loading vtab array module required for `IN (1,42,76,...)`");
rusqlite::vtab::array::load_module(conn)
.expect("error loading vtab array module");

let mut stmt = conn
.prepare_cached(
"SELECT id, denom, address FROM requests
WHERE tx_hash IS NULL AND chain_id IS ?1 LIMIT ?2",
"UPDATE requests SET tx_hash = ?1 WHERE id IN rarray(?2)",
)
.expect("SQL statement is valid");

let mut rows = stmt
.query((&chain.id, batch_size as i64))
.expect("???");

// https://docs.rs/rusqlite/latest/rusqlite/vtab/array/index.html
let rows_modified = stmt
.execute((
&result,
Rc::new(
requests
.iter()
.map(|req| req.id)
.map(rusqlite::types::Value::from)
.collect::<Vec<rusqlite::types::Value>>(),
),
))
.expect("can't query rows");

let mut requests = vec![];

while let Some(row) = rows.next().expect("could not read row") {
let id: i64 = row.get(0).expect("could not read id");
let denom: String = row.get(1).expect("could not read denom");
let receiver: String =
row.get(2).expect("could not read address");
info!(rows_modified, "updated requests");

let Some(coin) =
chain.coins.iter().find(|coin| coin.denom == denom)
else {
error!(
%denom,
"dropping request for unknown denom");
break;
};

requests.push(SendRequest {
id,
receiver,
denom,
amount: coin.amount,
});
}

Ok(requests)
Ok(())
})
.await
.expect("pool error");

if requests.is_empty() {
debug!("no requests in queue");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
let mut i = 0;

// try sending batch 5 times
let result = loop {
let send_res = chain_client.send(&requests).await;

match send_res {
Err(err) => {
if i >= 5 {
break format!("ERROR: {}", ErrorReporter(&*err));
}
warn!(
err = %ErrorReporter(&*err),
attempt = i,
"unable to submit transaction"
);
i += 1;
}
// this will be displayed to users, print the hash in the same way that cosmos sdk does
Ok(tx_hash) => {
break tx_hash
.into_encoding::<HexUnprefixed>()
.to_string()
.to_uppercase()
}
};
};

pool.conn(move |conn| {
debug!("loading vtab array module required for `IN (1,42,76,...)`");
rusqlite::vtab::array::load_module(conn)
.expect("error loading vtab array module");

let mut stmt = conn
.prepare_cached(
"UPDATE requests SET tx_hash = ?1 WHERE id IN rarray(?2)",
)
.expect("???");

// https://docs.rs/rusqlite/latest/rusqlite/vtab/array/index.html
let rows_modified = stmt
.execute((
&result,
Rc::new(
requests
.iter()
.map(|req| req.id)
.map(rusqlite::types::Value::from)
.collect::<Vec<rusqlite::types::Value>>(),
),
))
.expect("can't query rows");

info!(rows_modified, "updated requests");

Ok(())
})
.await
.expect("pool error");
}
})
.await;
})
.await;

match handle {
Ok(()) => {}
Err(err) => {
error!(err = %ErrorReporter(err), "handler panicked");
tokio::time::sleep(Duration::from_secs(1)).await;
match handle {
Ok(()) => {}
Err(err) => {
error!(err = %ErrorReporter(err), "handler panicked");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
});
.instrument(chain_polling_span.clone()),
);
}

let router = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));
Expand Down Expand Up @@ -324,6 +329,7 @@ struct ChainClient {
}

impl ChainClient {
#[instrument(skip_all, fields(chain_id = chain.id))]
pub async fn new(chain: &Chain) -> Self {
let rpc = Rpc::new(chain.rpc_url.clone()).await.unwrap();

Expand Down Expand Up @@ -353,7 +359,7 @@ impl ChainClient {
// Check if we are connected to a chain with the correct chain_id
assert_eq!(
chain_id, chain.id,
"ws_url {} is not for chain {}",
"rpc_url {} is not for chain {}",
chain.rpc_url, chain.id
);

Expand Down Expand Up @@ -406,6 +412,13 @@ impl SendRequestAggregator for Vec<SendRequest> {

impl ChainClient {
/// `MultiSend` to the specified addresses. Will return `None` if there are no signers available.
#[instrument(
skip_all,
fields(
chain_id = self.chain.id,
requests.len = requests.len()
)
)]
async fn send(&self, requests: &Vec<SendRequest>) -> anyhow::Result<H256> {
let agg_reqs = requests.aggregate_by_denom();

Expand Down Expand Up @@ -464,6 +477,7 @@ pub struct CaptchaSecret(pub String);

#[Object]
impl Mutation {
#[instrument(skip_all, fields(chain_id, address, denom))]
async fn send<'ctx>(
&self,
ctx: &Context<'ctx>,
Expand Down