Skip to content

Commit

Permalink
feat(hubble): support event filter for tendermint chains (#2581)
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserKarel authored Jul 27, 2024
2 parents 3b0b710 + 76ae7fa commit 1812ee6
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 82 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ num-traits = "0.2.19"
prometheus = { version = "0.13.3", features = ["process"] }
prost.workspace = true
protos = { workspace = true, features = ["client"] }
regex = "1.10.5"
reqwest = { workspace = true, features = ["json", "blocking"] }
serde = { workspace = true, features = ["derive"] }
serde-aux = "4.5.0"
Expand Down
6 changes: 6 additions & 0 deletions hubble/hubble.nix
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
type = types.listOf (
types.submodule {
options.label = mkOption { type = types.str; example = "something-custom"; };
options.filter = mkOption {
type = types.nullOr types.str;
description = "A regex which if matches, removes the event from the insertion";
example = "coin_received";
default = null;
};
options.urls = mkOption { type = types.listOf types.str; example = [ "https://rpc.example.com" ]; };
options.chain_id = mkOption { type = types.nullOr types.str; example = "union-testnet-8"; default = null; };
options.grpc_url = mkOption { type = types.nullOr types.str; example = "https://grpc.example.com"; default = null; };
Expand Down
29 changes: 0 additions & 29 deletions hubble/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,35 +470,6 @@ pub async fn get_chain_id<'a, A: Acquire<'a, Database = Postgres>>(
Ok(id)
}

#[allow(dead_code)]
pub async fn get_batch_of_unmapped_execution_heights<'a, A: Acquire<'a, Database = Postgres>>(
db: A,
chain_id: ChainId,
) -> sqlx::Result<Vec<i64>> {
use num_traits::cast::ToPrimitive;

let mut conn = db.acquire().await?;
let heights = sqlx::query!(
"
SELECT DISTINCT revision_height FROM v0.lightclient_updates_mat
WHERE counterparty_chain_id = $1
AND revision_height > coalesce((
SELECT MAX(consensus_height) from v0.consensus_heights
WHERE chain_id = $1
), 0)
ORDER BY revision_height ASC
LIMIT 200
",
chain_id.db
)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|record| record.revision_height.unwrap().to_u128().unwrap() as i64)
.collect();
Ok(heights)
}

pub async fn insert_mapped_execution_heights<'a, A: Acquire<'a, Database = Postgres>>(
db: A,
execution_heights: Vec<i64>,
Expand Down
114 changes: 63 additions & 51 deletions hubble/src/tm/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use backon::Retryable;
use color_eyre::eyre::{bail, eyre, Report};
use futures::{stream, stream::TryStreamExt, TryFutureExt};
use regex::Regex;
use sqlx::{Acquire, Postgres};
use tendermint::block::Height;
use tendermint_rpc::{
Expand Down Expand Up @@ -30,6 +31,9 @@ pub struct Config {
/// The height from which we start indexing
pub start_height: Option<u32>,

#[serde(default)]
pub filter: Option<String>,

/// Attempt to retry and fix bad states. This makes the process less responsive, as any call may take longer
/// since retries are happening. Best for systemd services and long-running jobs.
#[serde(default)]
Expand Down Expand Up @@ -68,6 +72,10 @@ impl Config {
.collect(),
);

let filter = self
.filter
.map(|filter| Regex::new(&filter).expect("should get valid regex"));

let mode = self.mode;
let (chain_id, height) = if self.harden {
(|| fetch_meta(&client, &pool).inspect_err(|e| debug!(?e, "error fetching meta")))
Expand All @@ -77,6 +85,7 @@ impl Config {
fetch_meta(&client, &pool).await?
};
let indexing_span = info_span!("indexer", chain_id = chain_id.canonical);

async move {
// Determine from which height we should start indexing if we haven't
// indexed any blocks yet. If start_height > current_height, we jump to the new start height
Expand All @@ -102,7 +111,7 @@ impl Config {
"fast syncing for batch: {}..{}", height, batch_end
);
let mut tx = pool.begin().await?;
let next_height = fetch_and_insert_blocks(&client, &mut tx, chain_id, Self::BATCH_SIZE, height, mode).await?.expect("batch sync with batch > 1 should error or succeed, but never reach head of chain");
let next_height = fetch_and_insert_blocks(&client, &mut tx, chain_id, Self::BATCH_SIZE, height, mode, filter.as_ref()).await?.expect("batch sync with batch > 1 should error or succeed, but never reach head of chain");
tx.commit().await?;
info!(
"indexed blocks {}..{}",
Expand All @@ -120,7 +129,7 @@ impl Config {
// Regular sync protocol. This fetches blocks one-by-one.
retry_count += 1;
let mut tx = pool.begin().await?;
match fetch_and_insert_blocks(&client, &mut tx, chain_id, 1, height, mode).await? {
match fetch_and_insert_blocks(&client, &mut tx, chain_id, 1, height, mode, filter.as_ref()).await? {
Some(h) => {
info!("indexed block {}", &height);
height = h;
Expand Down Expand Up @@ -238,6 +247,7 @@ async fn fetch_and_insert_blocks(
batch_size: u32,
from: Height,
mode: postgres::InsertMode,
filter: Option<&Regex>,
) -> Result<Option<Height>, Report> {
use itertools::Either;

Expand Down Expand Up @@ -323,59 +333,61 @@ async fn fetch_and_insert_blocks(
// Initial capacity is a bit of an estimate, but shouldn't need to resize too often.
let mut events = Vec::with_capacity(block_results.len() * 4 * 10);

let transactions =
block_results.into_iter().flat_map(|(header, block, txs)| {
let block_height: i32 = block.height.value().try_into().unwrap();
let block_hash = header.hash().to_string();
let time: OffsetDateTime = header.time.into();
let mut block_index = 0;
let finalize_block_events = block.events(chain_id, block_hash.clone(), time);

let txs =
txs.into_iter()
.map(|tx| {
let transaction_hash = tx.hash.to_string();
let data = serde_json::to_value(&tx).unwrap().replace_escape_chars();
events.extend(tx.tx_result.events.into_iter().enumerate().map(
|(i, event)| {
let event = PgEvent {
chain_id,
block_hash: block_hash.clone(),
block_height,
time,
data: serde_json::to_value(event)
.unwrap()
.replace_escape_chars(),
transaction_hash: Some(transaction_hash.clone()),
transaction_index: Some(i.try_into().unwrap()),
block_index,
};
block_index += 1;
event
},
));
PgTransaction {
let transactions = block_results.into_iter().flat_map(|(header, block, txs)| {
let block_height: i32 = block.height.value().try_into().unwrap();
let block_hash = header.hash().to_string();
let time: OffsetDateTime = header.time.into();
let mut block_index = 0;
let finalize_block_events = block.events(chain_id, block_hash.clone(), time);

let txs = txs
.into_iter()
.map(|tx| {
let transaction_hash = tx.hash.to_string();
let data = serde_json::to_value(&tx).unwrap().replace_escape_chars();
events.extend(tx.tx_result.events.into_iter().enumerate().filter_map(
|(i, event)| {
block_index += 1;

if filter.is_some_and(|filter| filter.is_match(event.kind.as_str())) {
return None;
}

let event = PgEvent {
chain_id,
block_hash: block_hash.clone(),
block_height,
time,
data,
hash: transaction_hash,
index: tx.index.try_into().unwrap(),
}
})
.collect::<Vec<_>>();
events.extend(
finalize_block_events
.into_iter()
.enumerate()
.map(|(i, e)| PgEvent {
block_index: i as i32 + block_index,
..e
}),
);
txs
});
data: serde_json::to_value(event).unwrap().replace_escape_chars(),
transaction_hash: Some(transaction_hash.clone()),
transaction_index: Some(i.try_into().unwrap()),
block_index,
};
Some(event)
},
));
PgTransaction {
chain_id,
block_hash: block_hash.clone(),
block_height,
time,
data,
hash: transaction_hash,
index: tx.index.try_into().unwrap(),
}
})
.collect::<Vec<_>>();
events.extend(
finalize_block_events
.into_iter()
.enumerate()
.map(|(i, e)| PgEvent {
block_index: i as i32 + block_index,
..e
}),
);
txs
});
postgres::insert_batch_transactions(tx, transactions, mode).await?;
postgres::insert_batch_events(tx, events, mode).await?;
Ok(Some((from.value() as u32 + headers.len() as u32).into()))
Expand Down

0 comments on commit 1812ee6

Please sign in to comment.