Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add metrics to ananlze bottleneck #37

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/grevm-fmt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ jobs:

- name: Run cargo fmt
run: cargo +nightly fmt --check

- name: Run cargo check
run: RUSTFLAGS="-D warnings" cargo check
30 changes: 22 additions & 8 deletions src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
use revm::primitives::{
alloy_primitives::U160, keccak256, ruint::UintTryFrom, Address, Bytes, TxEnv, TxKind, B256,
U256,
};
use std::sync::Arc;

use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

/// This module provides functionality for parsing and handling execution hints
/// for parallel transaction execution in the context of Ethereum-like blockchains.
Expand Down Expand Up @@ -80,11 +82,13 @@ pub(crate) struct ParallelExecutionHints {
/// Shared transaction states that will be updated with read/write sets
/// based on the contract interactions.
tx_states: SharedTxStates,
pub unknown_contract_tx_cnt: u64,
pub raw_tx_cnt: u64,
}

impl ParallelExecutionHints {
pub(crate) fn new(tx_states: SharedTxStates) -> Self {
Self { tx_states }
Self { tx_states, unknown_contract_tx_cnt: 0, raw_tx_cnt: 0 }
}

/// Obtain a mutable reference to shared transaction states, and parse execution hints for each
Expand All @@ -98,7 +102,9 @@ impl ParallelExecutionHints {
/// no conflicts between transactions, making the `Mutex` approach unnecessarily verbose and
/// cumbersome.
#[fastrace::trace]
pub(crate) fn parse_hints(&self, txs: Arc<Vec<TxEnv>>) {
pub(crate) fn parse_hints(&mut self, txs: Arc<Vec<TxEnv>>) {
let num_unknown_contract_tx = AtomicU64::new(0);
let num_raw_tx = AtomicU64::new(0);
// Utilize fork-join utility to process transactions in parallel
fork_join_util(txs.len(), None, |start_tx, end_tx, _| {
#[allow(invalid_reference_casting)]
Expand All @@ -122,18 +128,26 @@ impl ParallelExecutionHints {
&tx_env.data,
rw_set,
) {
num_unknown_contract_tx.fetch_add(1, Ordering::Relaxed);
rw_set.insert_location(
LocationAndType::Basic(to_address),
RWType::WriteOnly,
);
}
} else if to_address != tx_env.caller {
rw_set
.insert_location(LocationAndType::Basic(to_address), RWType::ReadWrite);
} else {
num_raw_tx.fetch_add(1, Ordering::Relaxed);
if to_address != tx_env.caller {
rw_set.insert_location(
LocationAndType::Basic(to_address),
RWType::ReadWrite,
);
}
}
}
}
});
self.unknown_contract_tx_cnt = num_unknown_contract_tx.load(Ordering::Acquire);
self.raw_tx_cnt = num_raw_tx.load(Ordering::Acquire);
}

/// This function computes the storage slot using the provided slot number and a vector of
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub mod storage;
mod tx_dependency;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

lazy_static! {
static ref DEBUG_BOTTLENECK: bool =
std::env::var("DEBUG_BOTTLENECK").map_or(false, |v| v == "on");
}

lazy_static! {
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
}
Expand Down
7 changes: 2 additions & 5 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,10 @@ where
*evm.tx_mut() = tx.clone();
evm.db_mut().current_txid = txid;
evm.db_mut().raw_transfer = true; // no need to wait miner rewards
let mut raw_transfer = true;
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
raw_transfer = info.is_empty_code_hash();
}
let mut raw_transfer = false;
if let TxKind::Call(to) = tx.transact_to {
if let Ok(Some(info)) = evm.db_mut().basic(to) {
raw_transfer &= info.is_empty_code_hash();
raw_transfer = info.is_empty_code_hash();
}
}
evm.db_mut().raw_transfer = raw_transfer;
Expand Down
74 changes: 54 additions & 20 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use crate::{
storage::{SchedulerDB, State},
tx_dependency::{DependentTxsVec, TxDependency},
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
DEBUG_BOTTLENECK, GREVM_RUNTIME, MAX_NUM_ROUND,
};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use atomic::Atomic;
use dashmap::DashSet;
use fastrace::Span;
use lazy_static::lazy_static;
use metrics::{gauge, histogram};
use metrics::{counter, gauge, histogram};
use revm::{
primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
Expand All @@ -32,6 +31,7 @@ use tokio::sync::Notify;
use tracing::info;

struct ExecuteMetrics {
block_height: metrics::Counter,
/// Number of times parallel execution is called.
parallel_execute_calls: metrics::Gauge,
/// Number of times sequential execution is called.
Expand Down Expand Up @@ -75,11 +75,16 @@ struct ExecuteMetrics {
commit_transition_time: metrics::Histogram,
/// Time taken to execute transactions in sequential(in nanoseconds).
sequential_execute_time: metrics::Histogram,
/// Number of transactions that failed to parse contract
unknown_contract_tx_cnt: metrics::Histogram,
/// Number of raw transactions
raw_tx_cnt: metrics::Histogram,
}

impl Default for ExecuteMetrics {
fn default() -> Self {
Self {
block_height: counter!("grevm.block_height"),
parallel_execute_calls: gauge!("grevm.parallel_round_calls"),
sequential_execute_calls: gauge!("grevm.sequential_execute_calls"),
total_tx_cnt: histogram!("grevm.total_tx_cnt"),
Expand All @@ -100,13 +105,16 @@ impl Default for ExecuteMetrics {
merge_write_set_time: histogram!("grevm.merge_write_set_time"),
commit_transition_time: histogram!("grevm.commit_transition_time"),
sequential_execute_time: histogram!("grevm.sequential_execute_time"),
unknown_contract_tx_cnt: histogram!("grevm.unknown_contract_tx_cnt"),
raw_tx_cnt: histogram!("grevm.raw_tx_cnt"),
}
}
}

/// Collect metrics and report
#[derive(Default)]
struct ExecuteMetricsCollector {
block_height: u64,
parallel_execute_calls: u64,
sequential_execute_calls: u64,
total_tx_cnt: u64,
Expand All @@ -127,11 +135,14 @@ struct ExecuteMetricsCollector {
merge_write_set_time: u64,
commit_transition_time: u64,
sequential_execute_time: u64,
unknown_contract_tx_cnt: u64,
raw_tx_cnt: u64,
}

impl ExecuteMetricsCollector {
fn report(&self) {
let execute_metrics = ExecuteMetrics::default();
execute_metrics.block_height.absolute(self.block_height);
execute_metrics.parallel_execute_calls.set(self.parallel_execute_calls as f64);
execute_metrics.sequential_execute_calls.set(self.sequential_execute_calls as f64);
execute_metrics.total_tx_cnt.record(self.total_tx_cnt as f64);
Expand All @@ -152,6 +163,8 @@ impl ExecuteMetricsCollector {
execute_metrics.merge_write_set_time.record(self.merge_write_set_time as f64);
execute_metrics.commit_transition_time.record(self.commit_transition_time as f64);
execute_metrics.sequential_execute_time.record(self.sequential_execute_time as f64);
execute_metrics.unknown_contract_tx_cnt.record(self.unknown_contract_tx_cnt as f64);
execute_metrics.raw_tx_cnt.record(self.raw_tx_cnt as f64);
}
}

Expand Down Expand Up @@ -327,7 +340,6 @@ where
let coinbase = env.block.coinbase;
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
let num_txs = txs.len();
info!("Parallel execute {} txs of SpecId {:?}", num_txs, spec_id);
Self {
spec_id,
env,
Expand All @@ -348,9 +360,10 @@ where

/// Get the partitioned transactions by dependencies.
#[fastrace::trace]
pub(crate) fn partition_transactions(&mut self) {
pub(crate) fn partition_transactions(&mut self, round: usize) {
// compute and assign partitioned_txs
let start = Instant::now();
self.tx_dependencies.round = Some(round);
self.partitioned_txs = self.tx_dependencies.fetch_best_partitions(self.num_partitions);
self.num_partitions = self.partitioned_txs.len();
let mut max = 0;
Expand Down Expand Up @@ -412,13 +425,15 @@ where
let start = Instant::now();
let mut merged_write_set: HashMap<LocationAndType, BTreeSet<TxId>> = HashMap::new();
let mut end_skip_id = self.num_finality_txs;
for txid in self.num_finality_txs..self.tx_states.len() {
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
end_skip_id == txid
{
end_skip_id += 1;
} else {
break;
if !(*DEBUG_BOTTLENECK) {
for txid in self.num_finality_txs..self.tx_states.len() {
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
end_skip_id == txid
{
end_skip_id += 1;
} else {
break;
}
}
}
if end_skip_id != self.tx_states.len() {
Expand All @@ -438,8 +453,7 @@ where
/// and there is no need to record the dependency and dependent relationships of these
/// transactions. Thus achieving the purpose of pruning.
#[fastrace::trace]
fn update_and_pruning_dependency(&mut self) {
let num_finality_txs = self.num_finality_txs;
fn update_and_pruning_dependency(&mut self, num_finality_txs: usize) {
if num_finality_txs == self.txs.len() {
return;
}
Expand Down Expand Up @@ -524,19 +538,26 @@ where
}
}
});
if *DEBUG_BOTTLENECK && self.num_finality_txs == 0 {
// Use the read-write set to build accurate dependencies,
// and try to find the bottleneck
self.update_and_pruning_dependency(0);
self.tx_dependencies.round = None;
self.tx_dependencies.fetch_best_partitions(self.num_partitions);
}
miner_involved_txs.into_iter().collect()
}

/// Find the continuous minimum TxID, which can be marked as finality transactions.
/// If the smallest TxID is a conflict transaction, return an error.
#[fastrace::trace]
fn find_continuous_min_txid(&mut self) -> Result<usize, GrevmError<DB::Error>> {
let mut min_execute_time = Duration::from_secs(u64::MAX);
let mut sum_execute_time = Duration::from_secs(0);
let mut max_execute_time = Duration::from_secs(0);
for executor in &self.partition_executors {
let mut executor = executor.write().unwrap();
self.metrics.reusable_tx_cnt += executor.metrics.reusable_tx_cnt;
min_execute_time = min_execute_time.min(executor.metrics.execute_time);
sum_execute_time += executor.metrics.execute_time;
max_execute_time = max_execute_time.max(executor.metrics.execute_time);
if executor.assigned_txs[0] == self.num_finality_txs &&
self.tx_states[self.num_finality_txs].tx_status == TransactionStatus::Conflict
Expand All @@ -549,7 +570,9 @@ where
let mut conflict_tx_cnt = 0;
let mut unconfirmed_tx_cnt = 0;
let mut finality_tx_cnt = 0;
self.metrics.partition_et_diff += (max_execute_time - min_execute_time).as_nanos() as u64;
let avg_execution_time =
sum_execute_time.as_nanos() / self.partition_executors.len() as u128;
self.metrics.partition_et_diff += (max_execute_time.as_nanos() - avg_execution_time) as u64;
#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
Expand Down Expand Up @@ -651,7 +674,7 @@ where
let miner_involved_txs = self.generate_unconfirmed_txs();
let finality_tx_cnt = self.find_continuous_min_txid()?;
// update and pruning tx dependencies
self.update_and_pruning_dependency();
self.update_and_pruning_dependency(self.num_finality_txs);
self.commit_transition(finality_tx_cnt)?;
let mut rewards_accumulators = RewardsAccumulators::new();
for txid in miner_involved_txs {
Expand Down Expand Up @@ -721,10 +744,12 @@ where
#[fastrace::trace]
fn parse_hints(&mut self) {
let start = Instant::now();
let hints = ParallelExecutionHints::new(self.tx_states.clone());
let mut hints = ParallelExecutionHints::new(self.tx_states.clone());
hints.parse_hints(self.txs.clone());
self.tx_dependencies.init_tx_dependency(self.tx_states.clone());
self.metrics.parse_hints_time += start.elapsed().as_nanos() as u64;
self.metrics.unknown_contract_tx_cnt += hints.unknown_contract_tx_cnt;
self.metrics.raw_tx_cnt += hints.raw_tx_cnt;
}

#[fastrace::trace]
Expand All @@ -734,6 +759,15 @@ where
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
let block_height: u64 = self.env.block.number.try_into().unwrap_or(0);
info!(
"Parallel execute {} txs: block={}, SpecId={:?}",
self.txs.len(),
block_height,
self.spec_id
);
self.metrics.block_height = block_height;
self.tx_dependencies.block_height = block_height;
if with_hints {
self.parse_hints();
}
Expand All @@ -754,7 +788,7 @@ where
let mut round = 0;
while round < MAX_NUM_ROUND {
if self.num_finality_txs < self.txs.len() {
self.partition_transactions();
self.partition_transactions(round);
if self.num_partitions == 1 && !force_parallel {
break;
}
Expand Down
Loading
Loading