Skip to content

Commit

Permalink
fix(pipe): some fix (#24)
Browse files Browse the repository at this point in the history
- fix `state_root_with_updates` and narrow down the critical area of the lock in `BlockViewStorage`
- implement `state_root_with_updates_v2` for `LatestStateProviderRef`
- skip `try_recv_engine_message` if enable `PIPE_EXEC_LAYER_EXT_V2`
- fix cancun fields
  • Loading branch information
nekomoto911 authored Jan 6, 2025
1 parent 0aedd72 commit fde3089
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 104 deletions.
100 changes: 52 additions & 48 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
pub use reth_engine_primitives::InvalidBlockHook;
use reth_pipe_exec_layer_ext::PIPE_EXEC_LAYER_EXT;
use reth_pipe_exec_layer_ext_v2::{
PipeExecLayerEvent, PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2,
PipeExecLayerEvent, PipeExecLayerExt as PipeExecLayerExtV2,
PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2,
};

/// Keeps track of the state of the tree.
Expand Down Expand Up @@ -626,34 +627,33 @@ where
(incoming, outgoing)
}

fn try_recv_pipe_exec_event(&self) -> Result<Option<PipeExecLayerEvent>, RecvError> {
if let Some(ext) = PIPE_EXEC_LAYER_EXT_V2.get() {
if self.persistence_state.in_progress() {
let mut waited_time_ms = 0;
loop {
match ext.event_rx.blocking_lock().try_recv() {
Ok(event) => return Ok(Some(event)),
Err(mpsc::error::TryRecvError::Empty) => {
if waited_time_ms > 500 {
// timeout
return Ok(None);
}
std::thread::sleep(std::time::Duration::from_millis(10));
waited_time_ms += 10;
fn try_recv_pipe_exec_event(
&self,
pipe_exec_layer_ext: &PipeExecLayerExtV2,
) -> Result<Option<PipeExecLayerEvent>, RecvError> {
if self.persistence_state.in_progress() {
let mut waited_time_ms = 0;
loop {
match pipe_exec_layer_ext.event_rx.blocking_lock().try_recv() {
Ok(event) => return Ok(Some(event)),
Err(mpsc::error::TryRecvError::Empty) => {
if waited_time_ms > 500 {
// timeout
return Ok(None);
}
Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError),
std::thread::sleep(std::time::Duration::from_millis(10));
waited_time_ms += 10;
}
}
} else {
let event = ext.event_rx.blocking_lock().blocking_recv();
if event.is_some() {
Ok(event)
} else {
Err(RecvError)
Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError),
}
}
} else {
Ok(None)
let event = pipe_exec_layer_ext.event_rx.blocking_lock().blocking_recv();
if event.is_some() {
Ok(event)
} else {
Err(RecvError)
}
}
}

Expand All @@ -664,11 +664,11 @@ where
self.state.tree_state.insert_executed(block);
tx.send(()).unwrap();
}
PipeExecLayerEvent::MakeCanonical(payload, tx) => {
PipeExecLayerEvent::MakeCanonical(payload, cancun_fields, tx) => {
let block_number = payload.block_number();
let block_hash = payload.block_hash();
debug!(target: "on_pipe_exec_event", block_number = %block_number, block_hash = %block_hash, "Received make canonical event");
self.on_new_payload(payload, None).unwrap_or_else(|err| {
self.on_new_payload(payload, cancun_fields).unwrap_or_else(|err| {
panic!(
"Failed to make canonical, block_number={block_number} block_hash={block_hash}: {err}",
)
Expand All @@ -687,31 +687,35 @@ where
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
// Wait for the pipe exec layer to be initialized
std::thread::sleep(std::time::Duration::from_secs(3));
let pipe_exec_layer_ext = PIPE_EXEC_LAYER_EXT_V2.get();
loop {
match self.try_recv_pipe_exec_event() {
Ok(Some(event)) => self.on_pipe_exec_event(event),
Ok(None) => {}
Err(RecvError) => {
error!(target: "engine::tree", "Pipe exec layer channel disconnected");
return
}
}

match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
match pipe_exec_layer_ext {
Some(ext) => match self.try_recv_pipe_exec_event(ext) {
Ok(Some(event)) => self.on_pipe_exec_event(event),
Ok(None) => {}
Err(RecvError) => {
error!(target: "engine::tree", "Pipe exec layer channel disconnected");
return
}
}
Ok(None) => {
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(_err) => {
error!(target: "engine::tree", "Engine channel disconnected");
return
}
},
None => match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
}
Ok(None) => {
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(_err) => {
error!(target: "engine::tree", "Engine channel disconnected");
return
}
},
}

if let Err(err) = self.advance_persistence() {
Expand Down
131 changes: 88 additions & 43 deletions crates/gravity-storage/src/block_view_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use core::hash;
use reth_payload_builder::database::CachedReads;
use reth_primitives::{revm_primitives::Bytecode, Address, B256, U256};
use reth_revm::database::StateProviderDatabase;
use reth_storage_api::{errors::provider::ProviderError, StateProviderBox, StateProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::{db::BundleState, primitives::AccountInfo, DatabaseRef};
use std::{clone, collections::BTreeMap, sync::{Arc, Mutex}};
use std::{
clone,
collections::BTreeMap,
sync::{Arc, Mutex},
};

use crate::{GravityStorage, GravityStorageError};

Expand All @@ -17,7 +22,7 @@ struct BlockViewStorageInner {
state_provider_info: (B256, u64), // (block_hash, block_number),
block_number_to_view: BTreeMap<u64, Arc<CachedReads>>,
block_number_to_state: BTreeMap<u64, Arc<HashedPostState>>,
block_number_to_trip_updates: BTreeMap<u64, Arc<TrieUpdates>>,
block_number_to_trie_updates: BTreeMap<u64, Arc<TrieUpdates>>,
block_number_to_id: BTreeMap<u64, B256>,
}

Expand All @@ -34,8 +39,20 @@ fn get_state_provider<Client: StateProviderFactory + 'static>(
}

impl<Client: StateProviderFactory + 'static> BlockViewStorage<Client> {
pub fn new(client: Client, latest_block_number: u64, latest_block_hash: B256, block_number_to_id: BTreeMap<u64, B256>) -> Self {
Self { client, inner: Mutex::new(BlockViewStorageInner::new(latest_block_number, latest_block_hash, block_number_to_id)) }
pub fn new(
client: Client,
latest_block_number: u64,
latest_block_hash: B256,
block_number_to_id: BTreeMap<u64, B256>,
) -> Self {
Self {
client,
inner: Mutex::new(BlockViewStorageInner::new(
latest_block_number,
latest_block_hash,
block_number_to_id,
)),
}
}
}

Expand All @@ -45,8 +62,8 @@ impl BlockViewStorageInner {
state_provider_info: (block_hash, block_number),
block_number_to_view: BTreeMap::new(),
block_number_to_state: BTreeMap::new(),
block_number_to_trip_updates: BTreeMap::new(),
block_number_to_id: block_number_to_id,
block_number_to_trie_updates: BTreeMap::new(),
block_number_to_id,
}
}
}
Expand All @@ -59,36 +76,39 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
target_block_number: u64,
) -> Result<(B256, Self::StateView), GravityStorageError> {
let storage = self.inner.lock().unwrap();
if target_block_number == storage.state_provider_info.1 {
return Ok((
*storage.block_number_to_id.get(&target_block_number).unwrap(),
BlockViewProvider::new(
vec![],
storage.block_number_to_id.clone(),
get_state_provider(&self.client, storage.state_provider_info.0)?,
),
));
}
if storage.block_number_to_view.get(&target_block_number).is_none() {
let (base_block_hash, base_block_number) = storage.state_provider_info;

let latest_block_number =
storage.block_number_to_view.keys().max().cloned().unwrap_or(base_block_number);
if target_block_number > latest_block_number {
return Err(GravityStorageError::TooNew(target_block_number));
}
let mut block_views = vec![];
storage.block_number_to_view.iter().rev().for_each(|(block_number, block_view)| {
let block_number = *block_number;
if storage.state_provider_info.1 < block_number && block_number <= target_block_number {
block_views.push(block_view.clone());
}
});

let block_id = *storage.block_number_to_id.get(&target_block_number).unwrap();
let block_hash = storage.state_provider_info.0;
let block_number_to_id = storage.block_number_to_id.clone();
let block_views: Vec<_> = storage
.block_number_to_view
.range(base_block_number + 1..=target_block_number)
.rev()
.map(|(_, view)| view.clone())
.collect();
drop(storage);

// Block number should be continuous
assert_eq!(block_views.len() as u64, target_block_number - base_block_number);

Ok((
block_id,
BlockViewProvider::new(block_views, storage.block_number_to_id.clone(), get_state_provider(&self.client, block_hash)?),
BlockViewProvider::new(
block_views,
block_number_to_id,
get_state_provider(&self.client, base_block_hash)?,
),
))
}

fn insert_block_id(&self, block_number: u64, block_id: B256) {
let mut storage = self.inner.lock().unwrap();
let mut storage = self.inner.lock().unwrap();
storage.block_number_to_id.insert(block_number, block_id);
}

Expand All @@ -103,11 +123,10 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
cached.insert_account(addr, info, storage);
}
}
let hashed_state = Arc::new(HashedPostState::from_bundle_state(&bundle_state.state));
let mut storage = self.inner.lock().unwrap();
storage.block_number_to_view.insert(block_number, Arc::new(cached));
storage
.block_number_to_state
.insert(block_number, Arc::new(HashedPostState::from_bundle_state(&bundle_state.state)));
storage.block_number_to_state.insert(block_number, hashed_state);
}

fn update_canonical(&self, block_number: u64, block_hash: B256) {
Expand All @@ -117,25 +136,47 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
storage.state_provider_info = (block_hash, block_number);
storage.block_number_to_view.remove(&gc_block_number);
storage.block_number_to_state.remove(&gc_block_number);
storage.block_number_to_trip_updates.remove(&gc_block_number);
storage.block_number_to_trie_updates.remove(&gc_block_number);
}

fn state_root_with_updates(
&self,
block_number: u64,
) -> Result<(B256, Arc<HashedPostState>, Arc<TrieUpdates>), GravityStorageError> {
let mut storage = self.inner.lock().unwrap();
let state_provider = get_state_provider(&self.client, storage.state_provider_info.0)?;
let mut hashed_state_vec = vec![];
let mut trie_updates_vec = vec![];
for number in storage.state_provider_info.1..block_number {
hashed_state_vec.push(storage.block_number_to_state.get(&number).unwrap().clone());
trie_updates_vec.push(storage.block_number_to_trip_updates.get(&number).unwrap().clone());
}
let storage = self.inner.lock().unwrap();
let (base_block_hash, base_block_number) = storage.state_provider_info;
let hashed_state_vec: Vec<_> = storage
.block_number_to_state
.range(base_block_number + 1..block_number)
.map(|(_, hashed_state)| hashed_state.clone())
.collect();
let trie_updates_vec: Vec<_> = storage
.block_number_to_trie_updates
.range(base_block_number + 1..block_number)
.map(|(_, trie_updates)| trie_updates.clone())
.collect();
let hashed_state = storage.block_number_to_state.get(&block_number).unwrap().clone();
let (state_root, trie_updates) = state_provider.state_root_with_updates_v2(hashed_state.as_ref().clone(), hashed_state_vec, trie_updates_vec).unwrap();
drop(storage);

// Block number should be continuous
assert_eq!(hashed_state_vec.len() as u64, block_number - base_block_number - 1);
assert_eq!(trie_updates_vec.len() as u64, block_number - base_block_number - 1);

let state_provider = get_state_provider(&self.client, base_block_hash)?;
let (state_root, trie_updates) = state_provider
.state_root_with_updates_v2(
hashed_state.as_ref().clone(),
hashed_state_vec,
trie_updates_vec,
)
.unwrap();
let trie_updates = Arc::new(trie_updates);
storage.block_number_to_trip_updates.insert(block_number, trie_updates.clone());

{
let mut storage = self.inner.lock().unwrap();
storage.block_number_to_trie_updates.insert(block_number, trie_updates.clone());
}

Ok((state_root, hashed_state, trie_updates))
}
}
Expand All @@ -147,7 +188,11 @@ pub struct BlockViewProvider {
}

impl BlockViewProvider {
fn new(block_views: Vec<Arc<CachedReads>>, block_number_to_id: BTreeMap<u64, B256>, state_provider: StateProviderBox) -> Self {
fn new(
block_views: Vec<Arc<CachedReads>>,
block_number_to_id: BTreeMap<u64, B256>,
state_provider: StateProviderBox,
) -> Self {
Self { block_views, block_number_to_id, db: StateProviderDatabase::new(state_provider) }
}
}
Expand Down Expand Up @@ -185,6 +230,6 @@ impl DatabaseRef for BlockViewProvider {
}

fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
Ok(*self.block_number_to_id.get(&number).unwrap())
Ok(*self.block_number_to_id.get(&number).unwrap())
}
}
1 change: 0 additions & 1 deletion crates/pipe-exec-layer-ext-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ reth-chainspec.workspace = true
reth-revm.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-trie.workspace = true
reth-chain-state.workspace = true
reth-rpc-types-compat.workspace = true
gravity-storage.workspace = true
Expand Down
Loading

0 comments on commit fde3089

Please sign in to comment.