Skip to content

Commit

Permalink
Merge pull request #14 from commonprefix/historical
Browse files Browse the repository at this point in the history
Historical
  • Loading branch information
pkakelas authored Dec 4, 2023
2 parents 34b6714 + 8243613 commit 082f990
Show file tree
Hide file tree
Showing 35 changed files with 11,638 additions and 176 deletions.
4 changes: 1 addition & 3 deletions feeder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ serde_json = "1.0.108"
ssz-rs = {git = "https://github.com/commonprefix/ssz-rs", branch = "main"}
sync-committee-rs = {package = "sync-committee-primitives", git = "https://github.com/commonprefix/sync-committee-rs", branch = "cp_changes", default-features = false, features = ["serialize"]}
thiserror = "1.0.50"

# async/futures
async-trait = "0.1.57"
async-trait = "0.1.74"
cosmos-sdk-proto = {version = "0.20.0", features = ["cosmwasm"]}
cosmrs = "0.15.0"
futures = "0.3.23"
Expand Down
119 changes: 98 additions & 21 deletions feeder/src/eth/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use std::cmp;

use crate::error::RpcError;
use crate::eth::utils::get;
use crate::types::*;
use async_trait::async_trait;
use consensus_types::consensus::{
BeaconBlockAlias, Bootstrap, FinalityUpdate, OptimisticUpdate, Update,
};
use eyre::Result;
use sync_committee_rs::consensus_types::BeaconBlockHeader;
use futures::future;
use ssz_rs::Vector;
use std::cmp;
use sync_committee_rs::{
consensus_types::BeaconBlockHeader,
constants::{Root, SLOTS_PER_HISTORICAL_ROOT},
};

#[async_trait]
pub trait EthBeaconAPI {
async fn get_block_root(&self, slot: u64) -> Result<Root>;
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap>;
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>>;
async fn get_finality_update(&self) -> Result<FinalityUpdate>;
async fn get_optimistic_update(&self) -> Result<OptimisticUpdate>;
async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader>;
async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias>;
}

#[async_trait]
pub trait CustomConsensusApi {
async fn get_block_roots_tree(
&self,
start_slot: u64,
) -> Result<Vector<Root, SLOTS_PER_HISTORICAL_ROOT>>;
async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader>;
async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias>;
}

#[derive(Debug)]
pub struct ConsensusRPC {
Expand All @@ -21,8 +47,21 @@ impl ConsensusRPC {
rpc: rpc.to_string(),
}
}
}

#[async_trait]
impl EthBeaconAPI for ConsensusRPC {
async fn get_block_root(&self, slot: u64) -> Result<Root> {
let req = format!("{}/eth/v1/beacon/blocks/{}/root", self.rpc, slot);

let res: BlockRootResponse = get(&req)
.await
.map_err(|e| RpcError::new("block_root", e))?;

Ok(res.data.root)
}

pub async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap> {
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap> {
let root_hex = hex::encode(block_root);
let req = format!(
"{}/eth/v1/beacon/light_client/bootstrap/0x{}",
Expand All @@ -34,7 +73,7 @@ impl ConsensusRPC {
Ok(res.data)
}

pub async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>> {
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>> {
let count = cmp::min(count, 10);
let req = format!(
"{}/eth/v1/beacon/light_client/updates?start_period={}&count={}",
Expand All @@ -46,23 +85,23 @@ impl ConsensusRPC {
Ok(res.into_iter().map(|d| d.data).collect())
}

pub async fn get_finality_update(&self) -> Result<FinalityUpdate> {
async fn get_finality_update(&self) -> Result<FinalityUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/finality_update", self.rpc);

let res: FinalityUpdateData = get(&req).await.map_err(|e| RpcError::new("updates", e))?;

Ok(res.data)
}

pub async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/optimistic_update", self.rpc);

let res: OptimisticUpdateData = get(&req).await.map_err(|e| RpcError::new("updates", e))?;

Ok(res.data)
}

pub async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader> {
async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/{}", self.rpc, slot);

let res: BeaconBlockHeaderResponse = get(&req)
Expand All @@ -71,18 +110,7 @@ impl ConsensusRPC {

Ok(res.data.header.message)
}

pub async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/head", self.rpc);

let res: BeaconBlockHeaderResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_header", e))?;

Ok(res.data.header.message)
}

pub async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias> {
async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias> {
let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot);

let res: BeaconBlockResponse = get(&req)
Expand All @@ -91,14 +119,63 @@ impl ConsensusRPC {

Ok(res.data.message)
}
}

pub async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias> {
#[async_trait]
impl CustomConsensusApi for ConsensusRPC {
async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias> {
let req = format!("{}/eth/v1/beacon/blocks/7834081", self.rpc);

let res: BeaconBlockResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_block", e))?;

println!("Got latest beacon block {:#?}", res);

Ok(res.data.message)
}

async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/head", self.rpc);

let res: BeaconBlockHeaderResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_header", e))?;

Ok(res.data.header.message)
}

async fn get_block_roots_tree(
&self,
start_slot: u64,
) -> Result<Vector<Root, SLOTS_PER_HISTORICAL_ROOT>> {
const BATCH_SIZE: usize = 1000;

let mut block_roots = vec![];

for batch_start in (0..SLOTS_PER_HISTORICAL_ROOT).step_by(BATCH_SIZE) {
let batch_end = std::cmp::min(batch_start + BATCH_SIZE, SLOTS_PER_HISTORICAL_ROOT);
let mut futures = Vec::new();

for i in batch_start..batch_end {
let future = self.get_block_root(start_slot + i as u64);
futures.push(future);
}

let resolved = future::join_all(futures).await;
println!("Resolved batch {}", batch_start / BATCH_SIZE);

// Block root tree includes the last block root if no block was minted in the slot
for block_root in resolved.iter() {
match block_root {
Ok(block_root) => block_roots.push(*block_root),
Err(_) => block_roots.push(*block_roots.last().unwrap()),
}
}
}

let block_roots = Vector::<Root, SLOTS_PER_HISTORICAL_ROOT>::try_from(block_roots).unwrap();

Ok(block_roots)
}
}
51 changes: 32 additions & 19 deletions feeder/src/eth/execution.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
use std::str::FromStr;

use async_trait::async_trait;
use ethers::prelude::Http;
use ethers::providers::{FilterKind, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient};
use ethers::types::{Block, Filter, Log, Transaction, TransactionReceipt, H256, U256, U64};
use eyre::Result;

use crate::error::RpcError;

#[async_trait]
pub trait ExecutionAPI {
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>>;
async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>>;
async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>>;
async fn get_block_with_txs(&self, block_number: u64) -> Result<Option<Block<Transaction>>>;
async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>>;
async fn get_latest_block_number(&self) -> Result<U64>;
async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool>;
async fn get_new_filter(&self, filter: &Filter) -> Result<U256>;
async fn chain_id(&self) -> Result<u64>;
}

pub struct ExecutionRPC {
pub provider: Provider<RetryClient<Http>>,
}

#[allow(dead_code)]
impl ExecutionRPC {
pub fn new(rpc: &str) -> Self {
let http = Http::from_str(rpc).expect("Could not initialize HTTP provider");
Expand All @@ -22,11 +38,11 @@ impl ExecutionRPC {

ExecutionRPC { provider }
}
}

pub async fn get_transaction_receipt(
&self,
tx_hash: &H256,
) -> Result<Option<TransactionReceipt>> {
#[async_trait]
impl ExecutionAPI for ExecutionRPC {
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>> {
let receipt = self
.provider
.get_transaction_receipt(*tx_hash)
Expand All @@ -36,7 +52,7 @@ impl ExecutionRPC {
Ok(receipt)
}

pub async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>> {
async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>> {
let block_receipts = self
.provider
.get_block_receipts(block_number)
Expand All @@ -46,7 +62,7 @@ impl ExecutionRPC {
Ok(block_receipts)
}

pub async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>> {
async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>> {
let block = self
.provider
.get_block(block_number)
Expand All @@ -56,10 +72,7 @@ impl ExecutionRPC {
Ok(block)
}

pub async fn get_block_with_txs(
&self,
block_number: u64,
) -> Result<Option<Block<Transaction>>> {
async fn get_block_with_txs(&self, block_number: u64) -> Result<Option<Block<Transaction>>> {
let block = self
.provider
.get_block_with_txs(block_number)
Expand All @@ -69,7 +82,7 @@ impl ExecutionRPC {
Ok(block)
}

pub async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>> {
async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>> {
let mut futures = vec![];
for &block_number in block_numbers {
futures.push(async move { self.get_block(block_number).await });
Expand All @@ -79,55 +92,55 @@ impl ExecutionRPC {
results
}

pub async fn get_latest_block_number(&self) -> Result<U64> {
async fn get_latest_block_number(&self) -> Result<U64> {
Ok(self
.provider
.get_block_number()
.await
.map_err(|e| RpcError::new("get_latest_block_number", e))?)
}

pub async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
Ok(self
.provider
.get_transaction(*tx_hash)
.await
.map_err(|e| RpcError::new("get_transaction", e))?)
}

pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> {
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> {
Ok(self
.provider
.get_logs(filter)
.await
.map_err(|e| RpcError::new("get_logs", e))?)
}

pub async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>> {
Ok(self
.provider
.get_filter_changes(filter_id)
.await
.map_err(|e| RpcError::new("get_filter_changes", e))?)
}

pub async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool> {
async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool> {
Ok(self
.provider
.uninstall_filter(filter_id)
.await
.map_err(|e| RpcError::new("uninstall_filter", e))?)
}

pub async fn get_new_filter(&self, filter: &Filter) -> Result<U256> {
async fn get_new_filter(&self, filter: &Filter) -> Result<U256> {
Ok(self
.provider
.new_filter(FilterKind::Logs(filter))
.await
.map_err(|e| RpcError::new("get_new_filter", e))?)
}

pub async fn chain_id(&self) -> Result<u64> {
async fn chain_id(&self) -> Result<u64> {
Ok(self
.provider
.get_chainid()
Expand Down
3 changes: 1 addition & 2 deletions feeder/src/eth/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::eth::constants::EXECUTION_RPC;
use crate::eth::execution::ExecutionRPC;
use crate::eth::execution::{ExecutionAPI, ExecutionRPC};
use crate::eth::utils::calc_slot_from_timestamp;
use crate::types::InternalMessage;
use consensus_types::lightclient::{CrossChainId, Message};
Expand Down Expand Up @@ -54,7 +54,6 @@ impl Gateway {
.zip(events)
.map(|(log, event)| {
let tx_hash = log.transaction_hash.unwrap();
println!("tx_hash: {}", tx_hash);
let log_index = log.log_index.unwrap();
let cc_id = CrossChainId {
chain: "ethereum".parse().unwrap(),
Expand Down
4 changes: 3 additions & 1 deletion feeder/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod consensus;
pub mod execution;
pub mod gateway;
pub mod state_prover;
pub mod utils;

pub mod constants {
pub const CONSENSUS_RPC: &str = "http://lodestar-mainnet.chainsafe.io";
pub const CONSENSUS_RPC: &str =
"https://ethereum-mainnet.core.chainstack.com/beacon/473f6bf8d0a884c77ef1ac103eaa5f1a";
pub const EXECUTION_RPC: &str = "https://eth.meowrpc.com";
pub const STATE_PROVER_RPC: &str = "http://65.21.123.218:3000";
pub const GATEWAY_ADDR: &str = "0x4F4495243837681061C4743b74B3eEdf548D56A5";
Expand Down
Loading

0 comments on commit 082f990

Please sign in to comment.