Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Historical #14

Merged
merged 12 commits into from
Dec 4, 2023
4 changes: 1 addition & 3 deletions feeder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ serde_json = "1.0.108"
ssz-rs = {git = "https://github.com/commonprefix/ssz-rs", branch = "main"}
sync-committee-rs = {package = "sync-committee-primitives", git = "https://github.com/commonprefix/sync-committee-rs", branch = "cp_changes", default-features = false, features = ["serialize"]}
thiserror = "1.0.50"

# async/futures
async-trait = "0.1.57"
async-trait = "0.1.74"
cosmos-sdk-proto = {version = "0.20.0", features = ["cosmwasm"]}
cosmrs = "0.15.0"
futures = "0.3.23"
Expand Down
119 changes: 98 additions & 21 deletions feeder/src/eth/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use std::cmp;

use crate::error::RpcError;
use crate::eth::utils::get;
use crate::types::*;
use async_trait::async_trait;
use consensus_types::consensus::{
BeaconBlockAlias, Bootstrap, FinalityUpdate, OptimisticUpdate, Update,
};
use eyre::Result;
use sync_committee_rs::consensus_types::BeaconBlockHeader;
use futures::future;
use ssz_rs::Vector;
use std::cmp;
use sync_committee_rs::{
consensus_types::BeaconBlockHeader,
constants::{Root, SLOTS_PER_HISTORICAL_ROOT},
};

#[async_trait]
pub trait EthBeaconAPI {
async fn get_block_root(&self, slot: u64) -> Result<Root>;
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap>;
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>>;
async fn get_finality_update(&self) -> Result<FinalityUpdate>;
async fn get_optimistic_update(&self) -> Result<OptimisticUpdate>;
async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader>;
async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias>;
}

#[async_trait]
pub trait CustomConsensusApi {
async fn get_block_roots_tree(
&self,
start_slot: u64,
) -> Result<Vector<Root, SLOTS_PER_HISTORICAL_ROOT>>;
async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader>;
async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias>;
}

#[derive(Debug)]
pub struct ConsensusRPC {
Expand All @@ -21,8 +47,21 @@ impl ConsensusRPC {
rpc: rpc.to_string(),
}
}
}

#[async_trait]
impl EthBeaconAPI for ConsensusRPC {
async fn get_block_root(&self, slot: u64) -> Result<Root> {
let req = format!("{}/eth/v1/beacon/blocks/{}/root", self.rpc, slot);

let res: BlockRootResponse = get(&req)
.await
.map_err(|e| RpcError::new("block_root", e))?;

Ok(res.data.root)
}

pub async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap> {
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap> {
let root_hex = hex::encode(block_root);
let req = format!(
"{}/eth/v1/beacon/light_client/bootstrap/0x{}",
Expand All @@ -34,7 +73,7 @@ impl ConsensusRPC {
Ok(res.data)
}

pub async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>> {
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>> {
let count = cmp::min(count, 10);
let req = format!(
"{}/eth/v1/beacon/light_client/updates?start_period={}&count={}",
Expand All @@ -46,23 +85,23 @@ impl ConsensusRPC {
Ok(res.into_iter().map(|d| d.data).collect())
}

pub async fn get_finality_update(&self) -> Result<FinalityUpdate> {
async fn get_finality_update(&self) -> Result<FinalityUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/finality_update", self.rpc);

let res: FinalityUpdateData = get(&req).await.map_err(|e| RpcError::new("updates", e))?;

Ok(res.data)
}

pub async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/optimistic_update", self.rpc);

let res: OptimisticUpdateData = get(&req).await.map_err(|e| RpcError::new("updates", e))?;

Ok(res.data)
}

pub async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader> {
async fn get_beacon_block_header(&self, slot: u64) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/{}", self.rpc, slot);

let res: BeaconBlockHeaderResponse = get(&req)
Expand All @@ -71,18 +110,7 @@ impl ConsensusRPC {

Ok(res.data.header.message)
}

pub async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/head", self.rpc);

let res: BeaconBlockHeaderResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_header", e))?;

Ok(res.data.header.message)
}

pub async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias> {
async fn get_beacon_block(&self, slot: u64) -> Result<BeaconBlockAlias> {
let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot);

let res: BeaconBlockResponse = get(&req)
Expand All @@ -91,14 +119,63 @@ impl ConsensusRPC {

Ok(res.data.message)
}
}

pub async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias> {
#[async_trait]
impl CustomConsensusApi for ConsensusRPC {
async fn get_latest_beacon_block(&self) -> Result<BeaconBlockAlias> {
let req = format!("{}/eth/v1/beacon/blocks/7834081", self.rpc);

let res: BeaconBlockResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_block", e))?;

println!("Got latest beacon block {:#?}", res);

Ok(res.data.message)
}

async fn get_latest_beacon_block_header(&self) -> Result<BeaconBlockHeader> {
let req = format!("{}/eth/v1/beacon/headers/head", self.rpc);

let res: BeaconBlockHeaderResponse = get(&req)
.await
.map_err(|e| RpcError::new("latest_beacon_header", e))?;

Ok(res.data.header.message)
}

async fn get_block_roots_tree(
&self,
start_slot: u64,
) -> Result<Vector<Root, SLOTS_PER_HISTORICAL_ROOT>> {
const BATCH_SIZE: usize = 1000;

let mut block_roots = vec![];

for batch_start in (0..SLOTS_PER_HISTORICAL_ROOT).step_by(BATCH_SIZE) {
let batch_end = std::cmp::min(batch_start + BATCH_SIZE, SLOTS_PER_HISTORICAL_ROOT);
let mut futures = Vec::new();

for i in batch_start..batch_end {
let future = self.get_block_root(start_slot + i as u64);
futures.push(future);
}

let resolved = future::join_all(futures).await;
println!("Resolved batch {}", batch_start / BATCH_SIZE);

// Block root tree includes the last block root if no block was minted in the slot
for block_root in resolved.iter() {
match block_root {
Ok(block_root) => block_roots.push(*block_root),
Err(_) => block_roots.push(*block_roots.last().unwrap()),
}
}
}

let block_roots = Vector::<Root, SLOTS_PER_HISTORICAL_ROOT>::try_from(block_roots).unwrap();

Ok(block_roots)
}
}
51 changes: 32 additions & 19 deletions feeder/src/eth/execution.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
use std::str::FromStr;

use async_trait::async_trait;
use ethers::prelude::Http;
use ethers::providers::{FilterKind, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient};
use ethers::types::{Block, Filter, Log, Transaction, TransactionReceipt, H256, U256, U64};
use eyre::Result;

use crate::error::RpcError;

#[async_trait]
pub trait ExecutionAPI {
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>>;
async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>>;
async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>>;
async fn get_block_with_txs(&self, block_number: u64) -> Result<Option<Block<Transaction>>>;
async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>>;
async fn get_latest_block_number(&self) -> Result<U64>;
async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool>;
async fn get_new_filter(&self, filter: &Filter) -> Result<U256>;
async fn chain_id(&self) -> Result<u64>;
}

pub struct ExecutionRPC {
pub provider: Provider<RetryClient<Http>>,
}

#[allow(dead_code)]
impl ExecutionRPC {
pub fn new(rpc: &str) -> Self {
let http = Http::from_str(rpc).expect("Could not initialize HTTP provider");
Expand All @@ -22,11 +38,11 @@ impl ExecutionRPC {

ExecutionRPC { provider }
}
}

pub async fn get_transaction_receipt(
&self,
tx_hash: &H256,
) -> Result<Option<TransactionReceipt>> {
#[async_trait]
impl ExecutionAPI for ExecutionRPC {
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>> {
let receipt = self
.provider
.get_transaction_receipt(*tx_hash)
Expand All @@ -36,7 +52,7 @@ impl ExecutionRPC {
Ok(receipt)
}

pub async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>> {
async fn get_block_receipts(&self, block_number: u64) -> Result<Vec<TransactionReceipt>> {
let block_receipts = self
.provider
.get_block_receipts(block_number)
Expand All @@ -46,7 +62,7 @@ impl ExecutionRPC {
Ok(block_receipts)
}

pub async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>> {
async fn get_block(&self, block_number: u64) -> Result<Option<Block<H256>>> {
let block = self
.provider
.get_block(block_number)
Expand All @@ -56,10 +72,7 @@ impl ExecutionRPC {
Ok(block)
}

pub async fn get_block_with_txs(
&self,
block_number: u64,
) -> Result<Option<Block<Transaction>>> {
async fn get_block_with_txs(&self, block_number: u64) -> Result<Option<Block<Transaction>>> {
let block = self
.provider
.get_block_with_txs(block_number)
Expand All @@ -69,7 +82,7 @@ impl ExecutionRPC {
Ok(block)
}

pub async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>> {
async fn get_blocks(&self, block_numbers: &[u64]) -> Result<Vec<Option<Block<H256>>>> {
let mut futures = vec![];
for &block_number in block_numbers {
futures.push(async move { self.get_block(block_number).await });
Expand All @@ -79,55 +92,55 @@ impl ExecutionRPC {
results
}

pub async fn get_latest_block_number(&self) -> Result<U64> {
async fn get_latest_block_number(&self) -> Result<U64> {
Ok(self
.provider
.get_block_number()
.await
.map_err(|e| RpcError::new("get_latest_block_number", e))?)
}

pub async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
Ok(self
.provider
.get_transaction(*tx_hash)
.await
.map_err(|e| RpcError::new("get_transaction", e))?)
}

pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> {
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> {
Ok(self
.provider
.get_logs(filter)
.await
.map_err(|e| RpcError::new("get_logs", e))?)
}

pub async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, filter_id: &U256) -> Result<Vec<Log>> {
Ok(self
.provider
.get_filter_changes(filter_id)
.await
.map_err(|e| RpcError::new("get_filter_changes", e))?)
}

pub async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool> {
async fn uninstall_filter(&self, filter_id: &U256) -> Result<bool> {
Ok(self
.provider
.uninstall_filter(filter_id)
.await
.map_err(|e| RpcError::new("uninstall_filter", e))?)
}

pub async fn get_new_filter(&self, filter: &Filter) -> Result<U256> {
async fn get_new_filter(&self, filter: &Filter) -> Result<U256> {
Ok(self
.provider
.new_filter(FilterKind::Logs(filter))
.await
.map_err(|e| RpcError::new("get_new_filter", e))?)
}

pub async fn chain_id(&self) -> Result<u64> {
async fn chain_id(&self) -> Result<u64> {
Ok(self
.provider
.get_chainid()
Expand Down
3 changes: 1 addition & 2 deletions feeder/src/eth/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::eth::constants::EXECUTION_RPC;
use crate::eth::execution::ExecutionRPC;
use crate::eth::execution::{ExecutionAPI, ExecutionRPC};
use crate::eth::utils::calc_slot_from_timestamp;
use crate::types::InternalMessage;
use consensus_types::lightclient::{CrossChainId, Message};
Expand Down Expand Up @@ -54,7 +54,6 @@ impl Gateway {
.zip(events)
.map(|(log, event)| {
let tx_hash = log.transaction_hash.unwrap();
println!("tx_hash: {}", tx_hash);
let log_index = log.log_index.unwrap();
let cc_id = CrossChainId {
chain: "ethereum".parse().unwrap(),
Expand Down
4 changes: 3 additions & 1 deletion feeder/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod consensus;
pub mod execution;
pub mod gateway;
pub mod state_prover;
pub mod utils;

pub mod constants {
pub const CONSENSUS_RPC: &str = "http://lodestar-mainnet.chainsafe.io";
pub const CONSENSUS_RPC: &str =
"https://ethereum-mainnet.core.chainstack.com/beacon/473f6bf8d0a884c77ef1ac103eaa5f1a";
pub const EXECUTION_RPC: &str = "https://eth.meowrpc.com";
pub const STATE_PROVER_RPC: &str = "http://65.21.123.218:3000";
pub const GATEWAY_ADDR: &str = "0x4F4495243837681061C4743b74B3eEdf548D56A5";
Expand Down
Loading
Loading