diff --git a/lib/ibc-vm-rs/src/lib.rs b/lib/ibc-vm-rs/src/lib.rs index 0e60e71073..dee22805aa 100644 --- a/lib/ibc-vm-rs/src/lib.rs +++ b/lib/ibc-vm-rs/src/lib.rs @@ -1,4 +1,3 @@ -use frame_support_procedural::PartialEqNoBound; use ibc_events::IbcEvent; use serde::{Deserialize, Serialize}; use states::{ @@ -31,7 +30,7 @@ lazy_static::lazy_static! { pub static ref DEFAULT_MERKLE_PREFIX: MerklePrefix = MerklePrefix { key_prefix: b"ibc".into() }; } -#[derive(thiserror::Error, PartialEqNoBound, Debug)] +#[derive(thiserror::Error, PartialEq, Debug)] pub enum IbcError { #[error("client {0} is not active ({1})")] NotActive(ClientId, Status), diff --git a/lib/pg-queue/src/lib.rs b/lib/pg-queue/src/lib.rs index 88d44310a7..b1e12e9841 100644 --- a/lib/pg-queue/src/lib.rs +++ b/lib/pg-queue/src/lib.rs @@ -3,7 +3,6 @@ use std::{ marker::PhantomData, time::Duration, }; -use frame_support_procedural::{CloneNoBound, DebugNoBound}; use futures_util::TryStreamExt; use itertools::Itertools; use schemars::JsonSchema; @@ -33,10 +32,11 @@ pub mod metrics; /// item JSONB /// error TEXT /// ``` -#[derive(DebugNoBound, CloneNoBound)] +#[derive(Debug, Clone)] pub struct PgQueue { client: PgPool, optimize_batch_limit: Option, + retryable_error_priority_decrease: Option, __marker: PhantomData T>, } @@ -48,7 +48,10 @@ pub struct PgQueueConfig { pub min_connections: Option, pub idle_timeout: Option, pub max_lifetime: Option, + #[serde(default)] pub optimize_batch_limit: Option, + #[serde(default)] + pub retryable_error_priority_decrease: Option, } impl PgQueueConfig { @@ -69,11 +72,22 @@ struct Id { } #[derive(Debug, FromRow)] -struct Record { +struct QueueRecord { id: i64, parents: Vec, item: String, created_at: sqlx::types::time::OffsetDateTime, + priority: i64, +} + +#[derive(Debug, FromRow)] +struct OptimizeRecord { + id: i64, + #[allow(dead_code)] + parents: Vec, + item: String, + #[allow(dead_code)] + created_at: sqlx::types::time::OffsetDateTime, } #[derive(Debug, FromRow, Serialize)] @@ -177,6 +191,7 @@ impl voyager_vm::Queue for PgQueue { // }); let optimize_batch_limit = config.optimize_batch_limit; + let retryable_error_priority_decrease = config.retryable_error_priority_decrease; let pool = config.into_pg_pool().await?; @@ -186,7 +201,8 @@ impl voyager_vm::Queue for PgQueue { id BIGSERIAL PRIMARY KEY, item JSONB NOT NULL, parents BIGINT[] DEFAULT '{}', - created_at timestamptz NOT NULL DEFAULT now() + created_at timestamptz NOT NULL DEFAULT now(), + priority int8 NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS optimize( @@ -216,7 +232,9 @@ impl voyager_vm::Queue for PgQueue { CREATE INDEX IF NOT EXISTS index_queue_id ON queue(id); - CREATE INDEX IF NOT EXISTS index_queue_created_at ON queue(created_at) include (id); + CREATE INDEX IF NOT EXISTS index_queue_created_at ON queue(created_at ASC) INCLUDE (id); + + CREATE INDEX IF NOT EXISTS index_queue_priority_created_at ON queue(priority DESC, created_at ASC) INCLUDE (id); "#, ) .try_for_each(|result| async move { @@ -229,6 +247,7 @@ impl voyager_vm::Queue for PgQueue { Ok(Self { client: pool, optimize_batch_limit, + retryable_error_priority_decrease, __marker: PhantomData, }) } @@ -329,6 +348,7 @@ impl voyager_vm::Queue for PgQueue { FROM queue ORDER BY + priority DESC, created_at ASC FOR UPDATE SKIP LOCKED @@ -337,130 +357,31 @@ impl voyager_vm::Queue for PgQueue { id, parents, item::text, + priority, created_at "#, ) - .try_map(|x| Record::from_row(&x)) + .try_map(|x| QueueRecord::from_row(&x)) .fetch_optional(tx.as_mut()) .await?; - match row { + let res = match row { Some(record) => { - let span = info_span!("processing item", item_id = record.id); - - trace!(%record.item); - - // really don't feel like defining a new error type right now - let op = de(&record.item).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; - - let timer = ITEM_PROCESSING_DURATION.start_timer(); - let (r, res) = f(op, ItemId::new(record.id).unwrap()) - .instrument(span.clone()) - .await; - let _ = timer.stop_and_record(); - - async move { - match res { - Err(QueueError::Fatal(error)) => { - let error = full_error_string(error); - error!(%error, "fatal error"); - insert_error(record, error, tx).await?; - } - Err(QueueError::Unprocessable(error)) => { - let error = full_error_string(error); - info!(%error, "unprocessable message"); - insert_error(record, error, tx).await?; - } - Err(QueueError::Retry(error)) => { - warn!(error = %full_error_string(error), "retryable error"); - sqlx::query( - " - INSERT INTO - queue (id, item, parents) - VALUES ($1, $2::JSONB, $3 ) - ", - ) - .bind(record.id) - .bind(record.item) - .bind(record.parents) - .execute(tx.as_mut()) - .await?; - - tokio::time::sleep(Duration::from_millis(500)).await; - } - Ok(ops) => { - 'block: { - // insert the op we just processed into done - sqlx::query( - " - INSERT INTO - done (id, parents, item, created_at) - VALUES ($1, $2, $3::JSONB, $4 ) - ", - ) - .bind(record.id) - .bind(record.parents) - .bind(record.item) - .bind(record.created_at) - .execute(tx.as_mut()) - .await?; - - if ops.is_empty() { - break 'block; - } - - let (optimize, ready): (Vec<_>, Vec<_>) = ops - .into_iter() - .flat_map(Op::normalize) - .partition_map(|op| match filter.check_interest(&op) { - FilterResult::Interest(tag) => Either::Left((op, tag)), - FilterResult::NoInterest => Either::Right(op), - }); - - sqlx::query( - " - INSERT INTO queue (item, parents) - SELECT *, $1 as parents FROM UNNEST($2::JSONB[]) - ", - ) - .bind(vec![record.id]) - .bind(ready.into_iter().map(Json).collect::>()) - .execute(tx.as_mut()) - .await?; - - sqlx::query( - " - INSERT INTO optimize (item, tag, parents) - SELECT *, $1 as parents FROM UNNEST($2::JSONB[], $3::TEXT[]) - ", - ) - .bind(vec![record.id]) - .bind(optimize.iter().map(|(op, _)| Json(op)).collect::>()) - .bind(optimize.iter().map(|(_, tag)| *tag).collect::>()) - .execute(tx.as_mut()) - .await?; - } - - tx.commit().await?; - } - } - - Ok::<_, Self::Error>(()) - } - .instrument(span) - .await?; - - Ok(Some(r)) + process_item( + &mut tx, + record, + f, + filter, + self.retryable_error_priority_decrease, + ) + .await? } - None => { - // trace!("queue is empty"); + None => None, + }; - // self.lock.store(true, Ordering::SeqCst); - // tokio::time::sleep(std::time::Duration::from_millis(2000)).await; + tx.commit().await?; - Ok(None) - } - } + Ok(res) } async fn optimize<'a, O: Pass>( @@ -470,11 +391,6 @@ impl voyager_vm::Queue for PgQueue { ) -> Result<(), Either> { trace!(%tag, "optimize"); - // if self.lock.swap(false, Ordering::SeqCst) { - // debug!("queue is locked"); - // tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - // } - let mut tx = self.client.begin().await.map_err(Either::Left)?; let msgs = sqlx::query( @@ -503,7 +419,7 @@ impl voyager_vm::Queue for PgQueue { ) .bind(tag) .bind(self.optimize_batch_limit) - .try_map(|x| Record::from_row(&x)) + .try_map(|x| OptimizeRecord::from_row(&x)) .fetch_all(tx.as_mut()) .await .map_err(Either::Left)?; @@ -609,10 +525,129 @@ impl voyager_vm::Queue for PgQueue { } } +#[instrument( + skip_all, + fields( + item_id = record.id, + priority = record.priority + ) +)] +async fn process_item<'a, T: QueueMessage, F, Fut, R>( + tx: &mut Transaction<'static, Postgres>, + record: QueueRecord, + f: F, + filter: &'a T::Filter, + retryable_error_priority_decrease: Option, +) -> Result, sqlx::Error> +where + F: (FnOnce(Op, ItemId) -> Fut) + Send + Captures<'a>, + Fut: Future>, QueueError>)> + Send + Captures<'a>, + R: Send + Sync + 'static, +{ + trace!(%record.item); + + // really don't feel like defining a new error type right now + let op = de::>(&record.item).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + + let timer = ITEM_PROCESSING_DURATION.start_timer(); + let (r, res) = f(op.clone(), ItemId::new(record.id).unwrap()).await; + let _ = timer.stop_and_record(); + + match res { + Err(QueueError::Fatal(error)) => { + let error = full_error_string(error); + error!(%error, "fatal error"); + insert_error(record, error, tx).await?; + } + Err(QueueError::Unprocessable(error)) => { + let error = full_error_string(error); + info!(%error, "unprocessable message"); + insert_error(record, error, tx).await?; + } + Err(QueueError::Retry(error)) => { + warn!(error = %full_error_string(error), "retryable error"); + sqlx::query( + " + INSERT INTO + queue (id, item, parents, priority) + VALUES ($1, $2::JSONB, $3, $4 ) + ", + ) + .bind(record.id) + .bind(record.item) + .bind(record.parents) + .bind( + record + .priority + .saturating_sub(retryable_error_priority_decrease.unwrap_or_default()), + ) + .execute(tx.as_mut()) + .await?; + + tokio::time::sleep(Duration::from_millis(500)).await; + } + Ok(ops) => { + 'block: { + // insert the op we just processed into done + sqlx::query( + " + INSERT INTO + done (id, parents, item, created_at) + VALUES ($1, $2, $3::JSONB, $4 ) + ", + ) + .bind(record.id) + .bind(record.parents) + .bind(record.item) + .bind(record.created_at) + .execute(tx.as_mut()) + .await?; + + if ops.is_empty() { + break 'block; + } + + let (optimize, ready): (Vec<_>, Vec<_>) = ops + .into_iter() + .flat_map(Op::normalize) + .partition_map(|op| match filter.check_interest(&op) { + FilterResult::Interest(tag) => Either::Left((op, tag)), + FilterResult::NoInterest => Either::Right(op), + }); + + sqlx::query( + " + INSERT INTO queue (item, parents) + SELECT *, $1 as parents FROM UNNEST($2::JSONB[]) + ", + ) + .bind(vec![record.id]) + .bind(ready.into_iter().map(Json).collect::>()) + .execute(tx.as_mut()) + .await?; + + sqlx::query( + " + INSERT INTO optimize (item, tag, parents) + SELECT *, $1 as parents FROM UNNEST($2::JSONB[], $3::TEXT[]) + ", + ) + .bind(vec![record.id]) + .bind(optimize.iter().map(|(op, _)| Json(op)).collect::>()) + .bind(optimize.iter().map(|(_, tag)| *tag).collect::>()) + .execute(tx.as_mut()) + .await?; + } + } + } + + Ok(Some(r)) +} + async fn insert_error( - record: Record, + record: QueueRecord, error: String, - mut tx: Transaction<'static, Postgres>, + tx: &mut Transaction<'static, Postgres>, ) -> Result<(), sqlx::Error> { // insert error message and the op into failed @@ -631,8 +666,6 @@ async fn insert_error( .execute(tx.as_mut()) .await?; - tx.commit().await?; - Ok(()) } diff --git a/lib/unionlabs-primitives/src/fixed_bytes.rs b/lib/unionlabs-primitives/src/fixed_bytes.rs index 3f9bc5de49..e9ff05e73d 100644 --- a/lib/unionlabs-primitives/src/fixed_bytes.rs +++ b/lib/unionlabs-primitives/src/fixed_bytes.rs @@ -249,14 +249,6 @@ impl FromStr for FixedBytes { } } -// #[derive(DebugNoBound, thiserror::Error)] -// pub enum HashDecodeError { -// #[error("invalid encoding")] -// InvalidEncoding(#[source] E::Error), -// #[error("invalid length")] -// FixedBytesError(#[from] FixedBytesError), -// } - impl Default for FixedBytes { fn default() -> Self { Self::new([0_u8; BYTES]) diff --git a/lib/voyager-message/src/lib.rs b/lib/voyager-message/src/lib.rs index 889139a5b9..804d966824 100644 --- a/lib/voyager-message/src/lib.rs +++ b/lib/voyager-message/src/lib.rs @@ -77,6 +77,7 @@ pub use reth_ipc; pub use voyager_core as core; pub use voyager_vm as vm; +#[derive(Debug, Clone, PartialEq)] pub enum VoyagerMessage {} impl QueueMessage for VoyagerMessage { diff --git a/lib/voyager-vm/src/in_memory.rs b/lib/voyager-vm/src/in_memory.rs index 888be11515..0fea3ae966 100644 --- a/lib/voyager-vm/src/in_memory.rs +++ b/lib/voyager-vm/src/in_memory.rs @@ -8,7 +8,6 @@ use std::{ }; use either::Either; -use frame_support_procedural::{CloneNoBound, DebugNoBound}; use tracing::{debug, error, info, info_span, trace, Instrument}; use unionlabs::ErrorReporter; @@ -18,7 +17,7 @@ use crate::{ Captures, EnqueueResult, ItemId, Op, Queue, QueueError, QueueMessage, }; -#[derive(DebugNoBound, CloneNoBound)] +#[derive(Debug, Clone)] pub struct InMemoryQueue { idx: Arc, ready: Arc>>>, @@ -27,7 +26,7 @@ pub struct InMemoryQueue { optimizer_queue: Arc>>>>, } -#[derive(DebugNoBound, CloneNoBound)] +#[derive(Debug, Clone)] pub(crate) struct Item { #[allow(dead_code)] // used in debug parents: Vec, diff --git a/lib/voyager-vm/src/lib.rs b/lib/voyager-vm/src/lib.rs index 2674bc8b01..6b655ec9a4 100644 --- a/lib/voyager-vm/src/lib.rs +++ b/lib/voyager-vm/src/lib.rs @@ -100,13 +100,7 @@ impl ItemId { } } -#[derive( - ::macros::Debug, - ::frame_support_procedural::CloneNoBound, - ::frame_support_procedural::PartialEqNoBound, - ::serde::Serialize, - ::serde::Deserialize, -)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde( tag = "@type", content = "@value", @@ -114,7 +108,6 @@ impl ItemId { bound(serialize = "", deserialize = ""), deny_unknown_fields )] -#[debug(bound())] pub enum Op { /// Inert data that will either be used in an [`Op::Promise`] or bubbled up to the top and sent as /// an output. @@ -152,15 +145,8 @@ pub enum Op { Noop, } -#[derive( - ::macros::Debug, - ::frame_support_procedural::CloneNoBound, - ::frame_support_procedural::PartialEqNoBound, - ::serde::Serialize, - ::serde::Deserialize, -)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(bound(serialize = "", deserialize = ""), deny_unknown_fields)] -#[debug(bound())] pub struct Promise { /// Messages that are expected to resolve to [`Op::Data`]. pub queue: VecDeque>, @@ -202,7 +188,8 @@ pub trait Visit { pub trait OpT = Debug + Clone + PartialEq + Serialize + for<'a> Deserialize<'a> + Send + Sync + Unpin; -pub trait QueueMessage: Sized + 'static { +// NOTE: Extra bounds are just for ease of use for derives +pub trait QueueMessage: Debug + Clone + PartialEq + Sized + 'static { type Data: OpT; type Call: CallT + OpT; type Callback: CallbackT + OpT; diff --git a/lib/voyager-vm/src/pass.rs b/lib/voyager-vm/src/pass.rs index b53d072432..a9dcd5360b 100644 --- a/lib/voyager-vm/src/pass.rs +++ b/lib/voyager-vm/src/pass.rs @@ -1,6 +1,5 @@ use std::error::Error; -use frame_support_procedural::{CloneNoBound, DebugNoBound, DefaultNoBound}; use futures::Future; use serde::{Deserialize, Serialize}; @@ -19,7 +18,7 @@ pub trait Pass: Send + Sync + Sized { /// The result of running an optimization pass. Both `optimize_further` and `ready` are lists of /// `(parents, op)`, allowing for correlating new messages with multiple parents (i.e. combining /// messages). -#[derive(DebugNoBound, CloneNoBound, DefaultNoBound, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(bound(serialize = "", deserialize = ""))] pub struct PassResult { /// [`Op`]s that are considered incomplete by this optimization pass and are to be optimized @@ -29,3 +28,12 @@ pub struct PassResult { /// on these [`Op`]s, and they will be requeued as "ready" in the queue. pub ready: Vec<(Vec, Op)>, } + +impl Default for PassResult { + fn default() -> Self { + Self { + optimize_further: vec![], + ready: vec![], + } + } +} diff --git a/lib/voyager-vm/src/tests.rs b/lib/voyager-vm/src/tests.rs index 0934b6f1f2..202020d9ab 100644 --- a/lib/voyager-vm/src/tests.rs +++ b/lib/voyager-vm/src/tests.rs @@ -8,6 +8,7 @@ use crate::{ pub mod utils; +#[derive(Debug, Clone, PartialEq)] enum UnitMessage {} impl QueueMessage for UnitMessage { diff --git a/lib/voyager-vm/src/tests/utils.rs b/lib/voyager-vm/src/tests/utils.rs index f1833b2de2..82d5230dc0 100644 --- a/lib/voyager-vm/src/tests/utils.rs +++ b/lib/voyager-vm/src/tests/utils.rs @@ -6,6 +6,7 @@ use subset_of::SubsetOf; use crate::{call, data, noop, CallT, CallbackT, Context, Op, QueueError, QueueMessage}; +#[derive(Debug, Clone, PartialEq)] pub enum SimpleMessage {} impl QueueMessage for SimpleMessage { diff --git a/voyager/plugins/transaction/cosmos-sdk/src/main.rs b/voyager/plugins/transaction/cosmos-sdk/src/main.rs index e2b796dafa..ed12a17503 100644 --- a/voyager/plugins/transaction/cosmos-sdk/src/main.rs +++ b/voyager/plugins/transaction/cosmos-sdk/src/main.rs @@ -340,7 +340,7 @@ impl PluginServer for Module { Some(Err(err)) => { // dbg!(&err); - let mut split_msgs = false; + // let mut split_msgs = false; match err { _ if let Some(err) = err.as_json_rpc_error() => { @@ -399,7 +399,7 @@ impl PluginServer for Module { log, } => { if let Some((msg_idx, log)) = parse_msg_idx_from_log(&log) { - let _span = info_span!("cosmos msg failed", msg_idx); + let _span = info_span!("cosmos msg failed", msg_idx).entered(); info!(%log); match self.fatal_errors.get(&(codespace.clone(), error_code)) { @@ -429,17 +429,39 @@ impl PluginServer for Module { } _ => { warn!("ibc-union error ({err}): {log}"); - split_msgs = true; + // split_msgs = true; } }, None => { warn!("error submitting transaction ({codespace}, {error_code}): {log}"); - split_msgs = true; + // split_msgs = true; } }, } - if !split_msgs { + // if !split_msgs { + // msgs.remove(msg_idx); + + // if msgs.is_empty() { + // Ok(noop()) + // } else { + // Ok(call(PluginMessage::new( + // self.plugin_name(), + // ModuleCall::SubmitTransaction(msgs), + // ))) + // } + // } else + if msgs.len() == 1 { + warn!("cosmos msg failed"); + + Ok(noop()) + } else { + // Ok(seq(msgs.into_iter().map(|msg| { + // call(PluginMessage::new( + // self.plugin_name(), + // ModuleCall::SubmitTransaction(vec![msg]), + // )) + // }))) msgs.remove(msg_idx); if msgs.is_empty() { @@ -450,17 +472,6 @@ impl PluginServer for Module { ModuleCall::SubmitTransaction(msgs), ))) } - } else if msgs.len() == 1 { - warn!("cosmos msg failed"); - - Ok(noop()) - } else { - Ok(seq(msgs.into_iter().map(|msg| { - call(PluginMessage::new( - self.plugin_name(), - ModuleCall::SubmitTransaction(vec![msg]), - )) - }))) } } else if log.contains("insufficient funds") { warn!("out of gas"); diff --git a/voyager/src/main.rs b/voyager/src/main.rs index 3995d551c3..da082b0dbf 100644 --- a/voyager/src/main.rs +++ b/voyager/src/main.rs @@ -165,6 +165,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { idle_timeout: None, max_lifetime: None, optimize_batch_limit: None, + retryable_error_priority_decrease: None, }), optimizer_delay_milliseconds: 100, ipc_client_request_timeout: Duration::new(60, 0),