From 76bdb1199402b652bd2f8e315f745cbc7c562b61 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Sat, 6 Jul 2024 17:29:25 +0100 Subject: [PATCH 01/10] Initial version for the chain state abstraction --- crates/rbuilder/src/backtest/execute.rs | 5 +- crates/rbuilder/src/building/builders/mod.rs | 24 +-- .../src/building/builders/ordering_builder.rs | 29 +-- crates/rbuilder/src/building/sim.rs | 15 +- crates/rbuilder/src/lib.rs | 1 + .../rbuilder/src/live_builder/base_config.rs | 12 +- .../rbuilder/src/live_builder/building/mod.rs | 25 +-- crates/rbuilder/src/live_builder/cli.rs | 6 +- crates/rbuilder/src/live_builder/config.rs | 5 +- crates/rbuilder/src/live_builder/mod.rs | 22 +- .../order_input/clean_orderpool.rs | 9 +- .../src/live_builder/order_input/mod.rs | 7 +- .../src/live_builder/simulation/mod.rs | 14 +- .../src/live_builder/simulation/sim_worker.rs | 7 +- crates/rbuilder/src/provider/http_provider.rs | 200 ++++++++++++++++++ crates/rbuilder/src/provider/mod.rs | 14 ++ crates/rbuilder/src/roothash/mod.rs | 1 - crates/rbuilder/src/utils/noncer.rs | 11 +- .../src/utils/provider_factory_reopen.rs | 18 +- 19 files changed, 335 insertions(+), 90 deletions(-) create mode 100644 crates/rbuilder/src/provider/http_provider.rs create mode 100644 crates/rbuilder/src/provider/mod.rs diff --git a/crates/rbuilder/src/backtest/execute.rs b/crates/rbuilder/src/backtest/execute.rs index fec8cd34..4a07d9e9 100644 --- a/crates/rbuilder/src/backtest/execute.rs +++ b/crates/rbuilder/src/backtest/execute.rs @@ -6,6 +6,7 @@ use crate::{ }, live_builder::cli::LiveBuilderConfig, primitives::SimulatedOrder, + provider::StateProviderFactory, utils::clean_extradata, }; use ahash::HashSet; @@ -49,9 +50,9 @@ pub struct BacktestBlockInput { pub sim_errors: Vec, } -pub fn backtest_prepare_ctx_for_block( +pub fn backtest_prepare_ctx_for_block( block_data: BlockData, - provider_factory: ProviderFactory, + provider_factory: Provider, chain_spec: Arc, build_block_lag_ms: i64, blocklist: HashSet
, diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index 5f8396cb..980f3b40 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -12,16 +12,15 @@ use crate::{ simulation::SimulatedOrderCommand, }, primitives::{AccountNonce, OrderId, SimulatedOrder}, + provider::StateProviderFactory, utils::NonceCache, }; use ahash::HashSet; use alloy_primitives::{Address, B256, U256}; use reth::{ primitives::{BlobTransactionSidecar, SealedBlock}, - providers::ProviderFactory, tasks::pool::BlockingTaskPool, }; -use reth_db::database::Database; use reth_payload_builder::database::CachedReads; use std::{ cmp::max, @@ -80,8 +79,8 @@ impl BestBlockCell { } #[derive(Debug)] -pub struct LiveBuilderInput { - pub provider_factory: ProviderFactory, +pub struct LiveBuilderInput { + pub provider_factory: Provider, pub root_hash_task_pool: BlockingTaskPool, pub ctx: BlockBuildingContext, pub input: broadcast::Receiver, @@ -159,10 +158,10 @@ pub struct OrderIntakeConsumer { order_consumer: OrderConsumer, } -impl OrderIntakeConsumer { +impl OrderIntakeConsumer { /// See [`ShareBundleMerger`] for sbundle_merger_selected_signers pub fn new( - provider_factory: ProviderFactory, + provider_factory: Provider, orders: broadcast::Receiver, parent_block: B256, sorting: Sorting, @@ -264,8 +263,9 @@ pub trait BlockBuildingSink: std::fmt::Debug + Clone + Send + Sync { } #[derive(Debug)] -pub struct BlockBuildingAlgorithmInput { - pub provider_factory: ProviderFactory, +pub struct BlockBuildingAlgorithmInput +{ + pub provider_factory: Provider, pub ctx: BlockBuildingContext, pub input: broadcast::Receiver, /// output for the blocks @@ -278,11 +278,11 @@ pub struct BlockBuildingAlgorithmInput: +pub trait BlockBuildingAlgorithm: std::fmt::Debug + Send + Sync { fn name(&self) -> String; - fn build_blocks(&self, input: BlockBuildingAlgorithmInput); + fn build_blocks(&self, input: BlockBuildingAlgorithmInput); } /// Factory used to create BlockBuildingSink for builders when we are targeting blocks for slots. @@ -299,11 +299,11 @@ pub trait BuilderSinkFactory { } /// Basic configuration to run a single block building with a BlockBuildingAlgorithm -pub struct BacktestSimulateBlockInput<'a, DB> { +pub struct BacktestSimulateBlockInput<'a, Provider> { pub ctx: BlockBuildingContext, pub builder_name: String, pub sbundle_mergeabe_signers: Vec
, pub sim_orders: &'a Vec, - pub provider_factory: ProviderFactory, + pub provider_factory: Provider, pub cached_reads: Option, } diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index b65a3662..fdd9ac24 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -13,13 +13,13 @@ use crate::{ ExecutionError, PartialBlock, Sorting, }, primitives::{AccountNonce, OrderId}, + provider::StateProviderFactory, telemetry, utils::is_provider_factory_health_error, }; use ahash::{HashMap, HashSet}; use alloy_primitives::{utils::format_ether, Address}; -use reth::providers::{BlockNumReader, ProviderFactory}; -use reth_db::database::Database; +use reth::providers::BlockNumReader; use reth_provider::StateProvider; use crate::{ @@ -69,8 +69,11 @@ impl OrderingBuilderConfig { } } -pub fn run_ordering_builder( - input: LiveBuilderInput, +pub fn run_ordering_builder< + Provider: StateProviderFactory + Clone + 'static, + SinkType: BlockBuildingSink, +>( + input: LiveBuilderInput, config: &OrderingBuilderConfig, ) { let block_number = input.ctx.block_env.number.to::(); @@ -152,9 +155,9 @@ pub fn run_ordering_builder( +pub fn backtest_simulate_block( ordering_config: OrderingBuilderConfig, - input: BacktestSimulateBlockInput<'_, DB>, + input: BacktestSimulateBlockInput<'_, Provider>, ) -> eyre::Result<(Block, CachedReads)> { let use_suggested_fee_recipient_as_coinbase = ordering_config.coinbase_payment; let state_provider = input @@ -183,8 +186,8 @@ pub fn backtest_simulate_block( } #[derive(Debug)] -pub struct OrderingBuilderContext { - provider_factory: ProviderFactory, +pub struct OrderingBuilderContext { + provider_factory: Provider, root_hash_task_pool: BlockingTaskPool, builder_name: String, ctx: BlockBuildingContext, @@ -200,9 +203,9 @@ pub struct OrderingBuilderContext { order_attempts: HashMap, } -impl OrderingBuilderContext { +impl OrderingBuilderContext { pub fn new( - provider_factory: ProviderFactory, + provider_factory: Provider, slot_bidder: Arc, root_hash_task_pool: BlockingTaskPool, builder_name: String, @@ -479,14 +482,14 @@ impl OrderingBuildingAlgorithm { } } -impl - BlockBuildingAlgorithm for OrderingBuildingAlgorithm +impl + BlockBuildingAlgorithm for OrderingBuildingAlgorithm { fn name(&self) -> String { self.name.clone() } - fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { let live_input = LiveBuilderInput { provider_factory: input.provider_factory, root_hash_task_pool: self.root_hash_task_pool.clone(), diff --git a/crates/rbuilder/src/building/sim.rs b/crates/rbuilder/src/building/sim.rs index ea16b125..75a70b3f 100644 --- a/crates/rbuilder/src/building/sim.rs +++ b/crates/rbuilder/src/building/sim.rs @@ -5,13 +5,12 @@ use super::{ use crate::{ building::{BlockBuildingContext, BlockState, CriticalCommitOrderError}, primitives::{Order, OrderId, SimValue, SimulatedOrder}, + provider::StateProviderFactory, utils::{NonceCache, NonceCacheRef}, }; use ahash::{HashMap, HashSet}; use alloy_primitives::{Address, B256}; use rand::seq::SliceRandom; -use reth::providers::ProviderFactory; -use reth_db::database::Database; use reth_interfaces::provider::ProviderError; use reth_payload_builder::database::CachedReads; use std::{ @@ -66,9 +65,9 @@ pub struct SimulatedResult { // @Feat replaceable orders #[derive(Debug)] -pub struct SimTree { +pub struct SimTree { // fields for nonce management - nonce_cache: NonceCache, + nonce_cache: NonceCache, sims: HashMap, sims_that_update_one_nonce: HashMap, @@ -86,8 +85,8 @@ enum OrderNonceState { Ready(Vec), } -impl SimTree { - pub fn new(provider_factory: ProviderFactory, parent_block: B256) -> Self { +impl SimTree { + pub fn new(provider_factory: Provider, parent_block: B256) -> Self { let nonce_cache = NonceCache::new(provider_factory, parent_block); Self { nonce_cache, @@ -304,8 +303,8 @@ impl SimTree { /// Non-interactive usage of sim tree that will simply simulate all orders. /// `randomize_insertion` is used to debug if sim tree works correctly when orders are inserted in a different order /// outputs should be independent of this arg. -pub fn simulate_all_orders_with_sim_tree( - factory: ProviderFactory, +pub fn simulate_all_orders_with_sim_tree( + factory: Provider, ctx: &BlockBuildingContext, orders: &[Order], randomize_insertion: bool, diff --git a/crates/rbuilder/src/lib.rs b/crates/rbuilder/src/lib.rs index fc6ec390..df4b7512 100644 --- a/crates/rbuilder/src/lib.rs +++ b/crates/rbuilder/src/lib.rs @@ -4,6 +4,7 @@ pub mod flashbots; pub mod live_builder; pub mod mev_boost; pub mod primitives; +pub mod provider; pub mod roothash; pub mod telemetry; pub mod utils; diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 398cde00..40f6ccbd 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -10,6 +10,7 @@ use crate::{ }, mev_boost::BLSBlockSigner, primitives::mev_boost::MevBoostRelay, + provider::StateProviderFactory, telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig}, utils::{http_provider, BoxedProvider, ProviderFactoryReopener, Signer}, validation_api_client::ValidationAPIClient, @@ -179,12 +180,15 @@ impl BaseConfig { } /// WARN: opens reth db - pub async fn create_builder( + pub async fn create_builder( &self, cancellation_token: tokio_util::sync::CancellationToken, ) -> eyre::Result< - super::LiveBuilder, super::building::relay_submit::RelaySubmitSinkFactory>, - > { + super::LiveBuilder, + > + where + Provider: StateProviderFactory, + { let submission_config = self.submission_config()?; info!( "Builder mev boost normal relay pubkey: {:?}", @@ -206,7 +210,7 @@ impl BaseConfig { let relays = self.relays()?; let sink_factory = RelaySubmitSinkFactory::new(self.submission_config()?, relays.clone()); - Ok(LiveBuilder::, RelaySubmitSinkFactory> { + Ok(LiveBuilder { cl_urls: self.cl_node_url.clone(), relays, watchdog_timeout: self.watchdog_timeout(), diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index 8e0f7d01..d5a85d65 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -10,9 +10,8 @@ use crate::{ BlockBuildingContext, }, live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults}, - utils::ProviderFactoryReopener, + provider::StateProviderFactory, }; -use reth_db::database::Database; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, trace}; @@ -27,27 +26,29 @@ use super::{ }; #[derive(Debug)] -pub struct BlockBuildingPool { - provider_factory: ProviderFactoryReopener, - builders: Vec>>, +pub struct BlockBuildingPool { + provider_factory: Provider, + builders: Vec>>, sink_factory: BuilderSinkFactoryType, bidding_service: Box, orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, + order_simulation_pool: OrderSimulationPool, } -impl - BlockBuildingPool +impl< + Provider: StateProviderFactory + Clone + 'static, + BuilderSinkFactoryType: BuilderSinkFactory, + > BlockBuildingPool where ::SinkType: 'static, { pub fn new( - provider_factory: ProviderFactoryReopener, - builders: Vec>>, + provider_factory: Provider, + builders: Vec>>, sink_factory: BuilderSinkFactoryType, bidding_service: Box, orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, + order_simulation_pool: OrderSimulationPool, ) -> Self { BlockBuildingPool { provider_factory, @@ -132,7 +133,7 @@ where for builder in self.builders.iter() { let builder_name = builder.name(); debug!(block = block_number, builder_name, "Spawning builder job"); - let input = BlockBuildingAlgorithmInput:: { + let input = BlockBuildingAlgorithmInput:: { provider_factory: provider_factory.clone(), ctx: ctx.clone(), input: broadcast_input.subscribe(), diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index 219f58b9..499604ad 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -10,7 +10,7 @@ use crate::{ building::builders::{BacktestSimulateBlockInput, Block}, live_builder::base_config::load_config_toml_and_env, telemetry::spawn_telemetry_server, - utils::build_info::Version, + utils::{build_info::Version, ProviderFactoryReopener}, }; use super::{base_config::BaseConfig, building::relay_submit::RelaySubmitSinkFactory, LiveBuilder}; @@ -41,7 +41,9 @@ pub trait LiveBuilderConfig: std::fmt::Debug + serde::de::DeserializeOwned { &self, cancellation_token: CancellationToken, ) -> impl std::future::Future< - Output = eyre::Result, RelaySubmitSinkFactory>>, + Output = eyre::Result< + LiveBuilder>, RelaySubmitSinkFactory>, + >, > + Send; /// Patch until we have a unified way of backtesting using the exact algorithms we use on the LiveBuilder. diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 879e63b5..1275b44b 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -62,7 +62,10 @@ impl LiveBuilderConfig for Config { &self, cancellation_token: tokio_util::sync::CancellationToken, ) -> eyre::Result< - super::LiveBuilder, super::building::relay_submit::RelaySubmitSinkFactory>, + super::LiveBuilder< + ProviderFactoryReopener>, + super::building::relay_submit::RelaySubmitSinkFactory, + >, > { let live_builder = self.base_config.create_builder(cancellation_token).await?; let root_hash_task_pool = self.base_config.root_hash_task_pool()?; diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 2c0d01e1..4dca8a0b 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -19,8 +19,9 @@ use crate::{ watchdog::spawn_watchdog_thread, }, primitives::mev_boost::MevBoostRelay, + provider::StateProviderFactory, telemetry::inc_active_slots, - utils::{error_storage::spawn_error_storage_writer, ProviderFactoryReopener, Signer}, + utils::{error_storage::spawn_error_storage_writer, Signer}, }; use ahash::HashSet; use alloy_primitives::{Address, B256}; @@ -53,7 +54,7 @@ const GET_BLOCK_HEADER_PERIOD: time::Duration = time::Duration::milliseconds(250 /// # Usage /// Create and run() #[derive(Debug)] -pub struct LiveBuilder { +pub struct LiveBuilder { pub cl_urls: Vec, pub relays: Vec, pub watchdog_timeout: Duration, @@ -62,7 +63,7 @@ pub struct LiveBuilder { pub order_input_config: OrderInputConfig, pub chain_chain_spec: Arc, - pub provider_factory: ProviderFactoryReopener, + pub provider_factory: Provider, pub coinbase_signer: Signer, pub extra_data: Vec, @@ -73,12 +74,14 @@ pub struct LiveBuilder { pub bidding_service: Box, pub sink_factory: BuilderSinkFactoryType, - pub builders: Vec>>, + pub builders: Vec>>, pub extra_rpc: RpcModule<()>, } -impl - LiveBuilder +impl< + Provider: StateProviderFactory + Clone + 'static, + BuilderSinkFactoryType: BuilderSinkFactory, + > LiveBuilder where ::SinkType: 'static, { @@ -95,7 +98,7 @@ where pub fn with_builders( self, - builders: Vec>>, + builders: Vec>>, ) -> Self { Self { builders, ..self } } @@ -185,6 +188,8 @@ where }; let parent_header = { + Header::default() + /* // @Nicer let parent_block = payload.parent_block_hash(); let timestamp = payload.timestamp(); @@ -196,9 +201,11 @@ where continue; } } + */ }; { + /* let provider_factory = self.provider_factory.clone(); let block = payload.block(); match spawn_blocking(move || { @@ -217,6 +224,7 @@ where continue; } } + */ } debug!( diff --git a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs index 5cff5e8e..0fe93924 100644 --- a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs @@ -1,15 +1,14 @@ use super::OrderInputConfig; use crate::{ live_builder::order_input::orderpool::OrderPool, + provider::StateProviderFactory, telemetry::{set_current_block, set_ordepool_count}, - utils::ProviderFactoryReopener, }; use ethers::{ middleware::Middleware, providers::{Ipc, Provider}, }; use futures::StreamExt; -use reth_db::database::Database; use std::{ pin::pin, sync::{Arc, Mutex}, @@ -19,9 +18,9 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; -pub async fn spawn_clean_orderpool_job( +pub async fn spawn_clean_orderpool_job( config: OrderInputConfig, - provider_factory: ProviderFactoryReopener, + provider_factory: SProvider, orderpool: Arc>, global_cancellation: CancellationToken, ) -> eyre::Result> { @@ -47,8 +46,6 @@ pub async fn spawn_clean_orderpool_job( let mut new_block_stream = pin!(new_block_stream); while let Some(block) = new_block_stream.next().await { - let provider_factory = provider_factory.provider_factory_unchecked(); - let block_number = block.number.unwrap_or_default().as_u64(); set_current_block(block_number); let state = match provider_factory.latest() { diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index 1f8ef223..41b90666 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -14,10 +14,9 @@ use self::{ }; use crate::{ primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order}, - utils::ProviderFactoryReopener, + provider::StateProviderFactory, }; use jsonrpsee::RpcModule; -use reth_db::database::Database; use std::{ net::Ipv4Addr, path::PathBuf, @@ -152,9 +151,9 @@ impl ReplaceableOrderPoolCommand { } /// @Pending reengineering to modularize rpc, block_subsidy_selector here is a patch -pub async fn start_orderpool_jobs( +pub async fn start_orderpool_jobs( config: OrderInputConfig, - provider_factory: ProviderFactoryReopener, + provider_factory: Provider, extra_rpc: RpcModule<()>, global_cancel: CancellationToken, ) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)> { diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index ba718579..521b2270 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -7,11 +7,11 @@ use crate::{ }, live_builder::order_input::orderpool::OrdersForBlock, primitives::{Order, OrderId, SimulatedOrder}, - utils::{gen_uid, ProviderFactoryReopener}, + provider::StateProviderFactory, + utils::gen_uid, }; use ahash::{HashMap, HashSet}; use alloy_primitives::utils::format_ether; -use reth_db::database::Database; use std::{ fmt, sync::{Arc, Mutex}, @@ -30,8 +30,8 @@ pub struct SlotOrderSimResults { type BlockContextId = u64; #[derive(Debug)] -pub struct OrderSimulationPool { - provider_factory: ProviderFactoryReopener, +pub struct OrderSimulationPool { + provider_factory: Provider, running_tasks: Arc>>>, current_contexts: Arc>, worker_threads: Vec>, @@ -128,7 +128,7 @@ pub enum SimulatedOrderCommand { /// - Always send simulations with increasing `sequence_number` /// - When replacing an order we always send a cancellation for the previous one. /// - When we gat a cancellation we propagate it and never again send an update. -impl SimulationJob { +impl SimulationJob { async fn run(&mut self) { let mut new_commands = Vec::new(); let mut new_sim_results = Vec::new(); @@ -298,9 +298,9 @@ impl SimulationJob { } // input new slot and a -impl OrderSimulationPool { +impl OrderSimulationPool { pub fn new( - provider_factory: ProviderFactoryReopener, + provider_factory: Provider, num_workers: usize, global_cancellation: CancellationToken, ) -> Self { diff --git a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs index 11750296..5f4f2085 100644 --- a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs +++ b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs @@ -4,11 +4,10 @@ use crate::{ simulate_order, BlockState, }, live_builder::simulation::CurrentSimulationContexts, + provider::StateProviderFactory, telemetry, telemetry::add_sim_thread_utilisation_timings, - utils::ProviderFactoryReopener, }; -use reth_db::database::Database; use reth_payload_builder::database::CachedReads; use std::{ sync::{Arc, Mutex}, @@ -18,10 +17,10 @@ use std::{ use tokio_util::sync::CancellationToken; use tracing::error; -pub fn run_sim_worker( +pub fn run_sim_worker( worker_id: usize, ctx: Arc>, - provider_factory: ProviderFactoryReopener, + provider_factory: Provider, global_cancellation: CancellationToken, ) { loop { diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs new file mode 100644 index 00000000..73656586 --- /dev/null +++ b/crates/rbuilder/src/provider/http_provider.rs @@ -0,0 +1,200 @@ +use super::StateProviderFactory; +use alloy_provider::{Provider, ProviderBuilder, RootProvider}; +use alloy_rpc_types::{BlockId, BlockNumberOrTag}; +use alloy_transport_http::Http; +use reqwest::Client; +use reth_interfaces::provider::ProviderResult; +use reth_primitives::{ + trie::AccountProof, Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256, +}; +use reth_provider::{ + AccountReader, BlockHashReader, StateProvider, StateProviderBox, StateRootProvider, +}; +use reth_trie::updates::TrieUpdates; +use revm::db::BundleState; +use tokio::runtime::Runtime; + +struct HttpProvider { + provider: RootProvider>, +} + +impl HttpProvider { + pub fn new_with_url(url: &str) -> Self { + let provider = ProviderBuilder::new() + .on_http(url.parse().unwrap()) + .unwrap(); + + Self { provider } + } + + fn get_block_number_by_tag(&self, tag: BlockNumberOrTag) -> ProviderResult { + let rt = Runtime::new() + .unwrap() + .block_on(self.provider.get_block_by_number(tag, false)) + .unwrap() + .unwrap(); + + Ok(rt.header.number.unwrap()) + } +} + +impl StateProviderFactory for HttpProvider { + fn history_by_block_number( + &self, + block_number: BlockNumber, + ) -> ProviderResult { + self.provider + .get_block_by_number(BlockNumberOrTag::Number(block_number), false); + + unimplemented!("TODO") + } + + fn latest(&self) -> ProviderResult { + unimplemented!("TODO") + } +} + +pub struct HttpProviderState { + provider: RootProvider>, + hash: B256, +} + +impl HttpProviderState { + pub fn new(provider: RootProvider>, hash: B256) -> Self { + Self { provider, hash } + } +} + +impl StateProvider for HttpProviderState { + /// Get storage of given account. + fn storage( + &self, + account: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + let res = Runtime::new() + .unwrap() + .block_on(self.provider.get_storage_at( + account, + storage_key.into(), + BlockId::hash(self.hash), + )) + .unwrap(); + + Ok(Some(res)) + } + + /// Get account code by its hash + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + // find the specific address + let address = Address::from_word(code_hash); + + let res = Runtime::new() + .unwrap() + .block_on(self.provider.get_code_at(address, BlockId::hash(self.hash))) + .unwrap(); + + Ok(Some(Bytecode::new_raw(res))) + } + + /// Get account and storage proofs. + fn proof(&self, address: Address, keys: &[B256]) -> ProviderResult { + let keys: Vec = keys.to_vec(); + + let res = Runtime::new() + .unwrap() + .block_on( + self.provider + .get_proof(address, keys, BlockId::hash(self.hash)), + ) + .unwrap(); + + unimplemented!("todo"); + } +} + +impl BlockHashReader for HttpProviderState { + /// Get the hash of the block with the given number. Returns `None` if no block with this number + /// exists. + fn block_hash(&self, number: BlockNumber) -> ProviderResult> { + let res = Runtime::new() + .unwrap() + .block_on( + self.provider + .get_block_by_number(BlockNumberOrTag::Number(number), false), + ) + .unwrap() + .unwrap(); + + Ok(res.header.hash) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + let mut res = vec![]; + + for i in start..end { + let block: alloy_rpc_types::Block = Runtime::new() + .unwrap() + .block_on( + self.provider + .get_block_by_number(BlockNumberOrTag::Number(i), false), + ) + .unwrap() + .unwrap(); + + res.push(block.header.hash.unwrap()); + } + + Ok(res) + } +} + +#[derive(Debug)] +enum AccountError {} + +impl AccountReader for HttpProviderState { + fn basic_account(&self, address: Address) -> ProviderResult> { + let res: Result = Runtime::new().unwrap().block_on(async { + let balance = self + .provider + .get_balance(address, BlockId::hash(self.hash)) + .await + .unwrap(); + + let nonce = self + .provider + .get_transaction_count(address, BlockId::hash(self.hash)) + .await + .unwrap(); + + Ok(Account { + balance, + nonce, + bytecode_hash: Some(address.into_word()), + }) + }); + + let res = res.unwrap(); + + Ok(Some(res)) + } +} + +impl StateRootProvider for HttpProviderState { + fn state_root(&self, _bundle_state: &BundleState) -> ProviderResult { + unimplemented!("todo"); + } + + /// Returns the state root of the BundleState on top of the current state with trie + /// updates to be committed to the database. + fn state_root_with_updates( + &self, + _bundle_state: &BundleState, + ) -> ProviderResult<(B256, TrieUpdates)> { + unimplemented!("todo"); + } +} diff --git a/crates/rbuilder/src/provider/mod.rs b/crates/rbuilder/src/provider/mod.rs new file mode 100644 index 00000000..047e76f4 --- /dev/null +++ b/crates/rbuilder/src/provider/mod.rs @@ -0,0 +1,14 @@ +use reth_interfaces::provider::ProviderResult; +use reth_primitives::BlockNumber; +use reth_provider::StateProviderBox; + +mod http_provider; + +pub trait StateProviderFactory: Send + Sync { + fn history_by_block_number( + &self, + block_number: BlockNumber, + ) -> ProviderResult; + + fn latest(&self) -> ProviderResult; +} diff --git a/crates/rbuilder/src/roothash/mod.rs b/crates/rbuilder/src/roothash/mod.rs index 31ca5986..5fa093a7 100644 --- a/crates/rbuilder/src/roothash/mod.rs +++ b/crates/rbuilder/src/roothash/mod.rs @@ -3,7 +3,6 @@ use reth::{ providers::{providers::ConsistentDbView, BundleStateWithReceipts, ProviderFactory}, tasks::pool::BlockingTaskPool, }; -use reth_db::database::Database; use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError}; #[derive(Debug, Clone, Copy)] diff --git a/crates/rbuilder/src/utils/noncer.rs b/crates/rbuilder/src/utils/noncer.rs index a8495d98..f3dc1033 100644 --- a/crates/rbuilder/src/utils/noncer.rs +++ b/crates/rbuilder/src/utils/noncer.rs @@ -1,21 +1,20 @@ use ahash::HashMap; use alloy_primitives::{Address, B256}; -use reth::providers::{ProviderFactory, StateProviderBox}; -use reth_db::database::Database; +use reth::providers::StateProviderBox; use reth_interfaces::provider::ProviderResult; use std::sync::{Arc, Mutex}; #[derive(Debug)] -pub struct NonceCache { - provider_factory: ProviderFactory, +pub struct NonceCache { + provider_factory: Provider, // we have to use Arc>>, block: B256, } -impl NonceCache { - pub fn new(provider_factory: ProviderFactory, block: B256) -> Self { +impl NonceCache { + pub fn new(provider_factory: Provider, block: B256) -> Self { Self { provider_factory, cache: Arc::new(Mutex::new(HashMap::default())), diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index 24cdf2d9..62878e3b 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -1,9 +1,12 @@ +use crate::provider::StateProviderFactory; use reth::{ primitives::ChainSpec, providers::{BlockHashReader, ChainSpecProvider, ProviderFactory}, }; use reth_db::database::Database; -use reth_interfaces::RethResult; +use reth_interfaces::{provider::ProviderResult, RethResult}; +use reth_primitives::BlockNumber; +use reth_provider::StateProviderBox; use std::{ path::PathBuf, sync::{Arc, Mutex}, @@ -122,3 +125,16 @@ pub fn check_provider_factory_health( Ok(()) } + +impl StateProviderFactory for ProviderFactoryReopener { + fn history_by_block_number( + &self, + block_number: BlockNumber, + ) -> ProviderResult { + unimplemented!("TODO"); + } + + fn latest(&self) -> ProviderResult { + unimplemented!("TODO"); + } +} From a85e59cd5198d131b622a9c48ecc2259e8460be6 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Thu, 1 Aug 2024 10:30:57 +0100 Subject: [PATCH 02/10] Fix --- crates/rbuilder/src/provider/http_provider.rs | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs index c4b4a8f1..80f3fb24 100644 --- a/crates/rbuilder/src/provider/http_provider.rs +++ b/crates/rbuilder/src/provider/http_provider.rs @@ -4,13 +4,12 @@ use alloy_rpc_types::{BlockId, BlockNumberOrTag}; use alloy_transport_http::Http; use reqwest::Client; use reth_errors::ProviderResult; -use reth_primitives::{ - trie::AccountProof, Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256, -}; +use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256}; use reth_provider::{ - AccountReader, BlockHashReader, StateProvider, StateProviderBox, StateRootProvider, + AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateProviderBox, + StateRootProvider, }; -use reth_trie::updates::TrieUpdates; +use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; use revm::db::BundleState; use tokio::runtime::Runtime; @@ -96,21 +95,6 @@ impl StateProvider for HttpProviderState { Ok(Some(Bytecode::new_raw(res))) } - - /// Get account and storage proofs. - fn proof(&self, address: Address, keys: &[B256]) -> ProviderResult { - let keys: Vec = keys.to_vec(); - - let res = Runtime::new() - .unwrap() - .block_on( - self.provider - .get_proof(address, keys, BlockId::hash(self.hash)), - ) - .unwrap(); - - unimplemented!("todo"); - } } impl BlockHashReader for HttpProviderState { @@ -185,16 +169,30 @@ impl AccountReader for HttpProviderState { } impl StateRootProvider for HttpProviderState { - fn state_root(&self, _bundle_state: &BundleState) -> ProviderResult { + /// Returns the state root of the `HashedPostState` on top of the current state. + fn hashed_state_root(&self, hashed_state: &HashedPostState) -> ProviderResult { unimplemented!("todo"); } - /// Returns the state root of the BundleState on top of the current state with trie + /// Returns the state root of the `HashedPostState` on top of the current state with trie /// updates to be committed to the database. - fn state_root_with_updates( + fn hashed_state_root_with_updates( &self, - _bundle_state: &BundleState, + hashed_state: &HashedPostState, ) -> ProviderResult<(B256, TrieUpdates)> { unimplemented!("todo"); } } + +impl StateProofProvider for HttpProviderState { + /// Get account and storage proofs of target keys in the `HashedPostState` + /// on top of the current state. + fn hashed_proof( + &self, + hashed_state: &HashedPostState, + address: Address, + slots: &[B256], + ) -> ProviderResult { + unimplemented!("todo"); + } +} From 5b693f5cc424108b43e0affc4f3a1a1abf8dc31b Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Thu, 1 Aug 2024 17:39:08 +0100 Subject: [PATCH 03/10] More changes --- crates/rbuilder/src/building/sim.rs | 1 - crates/rbuilder/src/provider/http_provider.rs | 1 - crates/rbuilder/src/utils/noncer.rs | 3 +-- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/rbuilder/src/building/sim.rs b/crates/rbuilder/src/building/sim.rs index f253c8d6..a113b0dd 100644 --- a/crates/rbuilder/src/building/sim.rs +++ b/crates/rbuilder/src/building/sim.rs @@ -14,7 +14,6 @@ use rand::seq::SliceRandom; use reth::providers::ProviderFactory; use reth_db::database::Database; use reth_errors::ProviderError; -use reth_interfaces::provider::ProviderError; use reth_payload_builder::database::CachedReads; use std::{ cmp::{max, min, Ordering}, diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs index 80f3fb24..2dc1981b 100644 --- a/crates/rbuilder/src/provider/http_provider.rs +++ b/crates/rbuilder/src/provider/http_provider.rs @@ -10,7 +10,6 @@ use reth_provider::{ StateRootProvider, }; use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; -use revm::db::BundleState; use tokio::runtime::Runtime; struct HttpProvider { diff --git a/crates/rbuilder/src/utils/noncer.rs b/crates/rbuilder/src/utils/noncer.rs index 71597e75..a32dc2d7 100644 --- a/crates/rbuilder/src/utils/noncer.rs +++ b/crates/rbuilder/src/utils/noncer.rs @@ -1,7 +1,6 @@ use ahash::HashMap; use alloy_primitives::{Address, B256}; -use reth::providers::{ProviderFactory, StateProviderBox}; -use reth_db::database::Database; +use reth::providers::StateProviderBox; use reth_errors::ProviderResult; use std::sync::{Arc, Mutex}; From 765230d5a293c20708a4e0a7cc9f3745fdc651e2 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 11:25:39 +0100 Subject: [PATCH 04/10] Rebase top --- .../src/backtest/backtest_build_block.rs | 15 +++---- crates/rbuilder/src/backtest/execute.rs | 8 ++-- .../resim_landed_block.rs | 24 +++++----- crates/rbuilder/src/building/builders/mod.rs | 14 +++--- .../src/building/builders/ordering_builder.rs | 15 ++++--- crates/rbuilder/src/building/sim.rs | 2 - .../rbuilder/src/live_builder/base_config.rs | 44 ++++++++++--------- .../rbuilder/src/live_builder/building/mod.rs | 14 +++--- crates/rbuilder/src/live_builder/cli.rs | 4 +- crates/rbuilder/src/live_builder/config.rs | 13 +++--- crates/rbuilder/src/live_builder/mod.rs | 15 +++---- .../src/live_builder/simulation/mod.rs | 14 +++--- .../live_builder/simulation/simulation_job.rs | 10 ++--- crates/rbuilder/src/provider/http_provider.rs | 2 + 14 files changed, 99 insertions(+), 95 deletions(-) diff --git a/crates/rbuilder/src/backtest/backtest_build_block.rs b/crates/rbuilder/src/backtest/backtest_build_block.rs index 9d2f21c1..a0ad88f2 100644 --- a/crates/rbuilder/src/backtest/backtest_build_block.rs +++ b/crates/rbuilder/src/backtest/backtest_build_block.rs @@ -8,14 +8,14 @@ use ahash::HashMap; use alloy_primitives::utils::format_ether; -use crate::backtest::restore_landed_orders::{ - restore_landed_orders, sim_historical_block, ExecutedBlockTx, ExecutedTxs, SimplifiedOrder, -}; -use crate::backtest::OrdersWithTimestamp; use crate::{ backtest::{ execute::{backtest_prepare_ctx_for_block, BacktestBlockInput}, - BlockData, HistoricalDataStorage, + restore_landed_orders::{ + restore_landed_orders, sim_historical_block, ExecutedBlockTx, ExecutedTxs, + SimplifiedOrder, + }, + BlockData, HistoricalDataStorage, OrdersWithTimestamp, }, building::builders::BacktestSimulateBlockInput, live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig}, @@ -85,10 +85,7 @@ pub async fn run_backtest_build_block() -> eyre:: print_order_and_timestamp(&block_data.available_orders, &block_data); } - let provider_factory = config - .base_config() - .provider_factory()? - .provider_factory_unchecked(); + let provider_factory = config.base_config().provider_factory()?; let chain_spec = config.base_config().chain_spec()?; let sbundle_mergeabe_signers = config.base_config().sbundle_mergeabe_signers(); diff --git a/crates/rbuilder/src/backtest/execute.rs b/crates/rbuilder/src/backtest/execute.rs index d0a7adff..970506b4 100644 --- a/crates/rbuilder/src/backtest/execute.rs +++ b/crates/rbuilder/src/backtest/execute.rs @@ -1,5 +1,3 @@ -use crate::primitives::OrderId; -use crate::utils::Signer; use crate::{ backtest::BlockData, building::{ @@ -7,15 +5,15 @@ use crate::{ BlockBuildingContext, BundleErr, OrderErr, TransactionErr, }, live_builder::cli::LiveBuilderConfig, - primitives::SimulatedOrder, + primitives::{OrderId, SimulatedOrder}, provider::StateProviderFactory, - utils::clean_extradata, + utils::{clean_extradata, Signer}, }; use ahash::HashSet; use alloy_primitives::{Address, U256}; use reth::providers::ProviderFactory; use reth_chainspec::ChainSpec; -use reth_db::{database::Database, DatabaseEnv}; +use reth_db::DatabaseEnv; use reth_payload_builder::database::CachedReads; use serde::{Deserialize, Serialize}; use std::sync::Arc; diff --git a/crates/rbuilder/src/backtest/restore_landed_orders/resim_landed_block.rs b/crates/rbuilder/src/backtest/restore_landed_orders/resim_landed_block.rs index c95f6e69..c7ec13f0 100644 --- a/crates/rbuilder/src/backtest/restore_landed_orders/resim_landed_block.rs +++ b/crates/rbuilder/src/backtest/restore_landed_orders/resim_landed_block.rs @@ -1,18 +1,22 @@ -use crate::building::evm_inspector::SlotKey; -use crate::building::tracers::AccumulatorSimulationTracer; -use crate::building::{BlockBuildingContext, BlockState, PartialBlock, PartialBlockFork}; -use crate::primitives::serialize::{RawTx, TxEncoding}; -use crate::primitives::TransactionSignedEcRecoveredWithBlobs; -use crate::utils::signed_uint_delta; +use crate::{ + building::{ + evm_inspector::SlotKey, tracers::AccumulatorSimulationTracer, BlockBuildingContext, + BlockState, PartialBlock, PartialBlockFork, + }, + primitives::{ + serialize::{RawTx, TxEncoding}, + TransactionSignedEcRecoveredWithBlobs, + }, + provider::StateProviderFactory, + utils::signed_uint_delta, +}; use ahash::{HashMap, HashSet}; use alloy_consensus::TxEnvelope; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{Address, B256, I256}; use eyre::Context; use reth_chainspec::ChainSpec; -use reth_db::DatabaseEnv; use reth_primitives::{Receipt, TransactionSignedEcRecovered}; -use reth_provider::ProviderFactory; use std::sync::Arc; #[derive(Debug)] @@ -23,8 +27,8 @@ pub struct ExecutedTxs { pub conflicting_txs: Vec<(B256, Vec)>, } -pub fn sim_historical_block( - provider_factory: ProviderFactory>, +pub fn sim_historical_block( + provider_factory: Provider, chain_spec: Arc, onchain_block: alloy_rpc_types::Block, ) -> eyre::Result> { diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index 247b37ff..4a2f0857 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -33,8 +33,8 @@ pub struct Block { } #[derive(Debug)] -pub struct LiveBuilderInput { - pub provider_factory: ProviderFactory, +pub struct LiveBuilderInput { + pub provider_factory: Provider, pub root_hash_task_pool: BlockingTaskPool, pub ctx: BlockBuildingContext, pub input: broadcast::Receiver, @@ -191,8 +191,8 @@ pub trait UnfinishedBlockBuildingSink: std::fmt::Debug + Send + Sync { } #[derive(Debug)] -pub struct BlockBuildingAlgorithmInput { - pub provider_factory: ProviderFactory, +pub struct BlockBuildingAlgorithmInput { + pub provider_factory: Provider, pub ctx: BlockBuildingContext, pub input: broadcast::Receiver, /// output for the blocks @@ -203,9 +203,11 @@ pub struct BlockBuildingAlgorithmInput { /// Algorithm to build blocks /// build_blocks should send block to input.sink until input.cancel is cancelled. /// slot_bidder should be used to decide how much to bid. -pub trait BlockBuildingAlgorithm: std::fmt::Debug + Send + Sync { +pub trait BlockBuildingAlgorithm: + std::fmt::Debug + Send + Sync +{ fn name(&self) -> String; - fn build_blocks(&self, input: BlockBuildingAlgorithmInput); + fn build_blocks(&self, input: BlockBuildingAlgorithmInput); } /// Factory used to create UnfinishedBlockBuildingSink for builders. diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index 5bcd2a21..ea386b71 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -14,11 +14,10 @@ use crate::{ BlockBuildingContext, BlockOrders, ExecutionError, Sorting, }, primitives::{AccountNonce, OrderId}, + provider::StateProviderFactory, }; use ahash::{HashMap, HashSet}; use alloy_primitives::Address; -use reth::providers::ProviderFactory; -use reth_db::database::Database; use tokio_util::sync::CancellationToken; use crate::{roothash::RootHashMode, utils::check_provider_factory_health}; @@ -61,8 +60,8 @@ impl OrderingBuilderConfig { } } -pub fn run_ordering_builder( - input: LiveBuilderInput, +pub fn run_ordering_builder( + input: LiveBuilderInput, config: &OrderingBuilderConfig, ) { let mut order_intake_consumer = OrderIntakeConsumer::new( @@ -187,7 +186,7 @@ pub struct OrderingBuilderContext { impl OrderingBuilderContext { pub fn new( - provider_factory: ProviderFactory, + provider_factory: Provider, root_hash_task_pool: BlockingTaskPool, builder_name: String, ctx: BlockBuildingContext, @@ -359,12 +358,14 @@ impl OrderingBuildingAlgorithm { } } -impl BlockBuildingAlgorithm for OrderingBuildingAlgorithm { +impl BlockBuildingAlgorithm + for OrderingBuildingAlgorithm +{ fn name(&self) -> String { self.name.clone() } - fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { let live_input = LiveBuilderInput { provider_factory: input.provider_factory, root_hash_task_pool: self.root_hash_task_pool.clone(), diff --git a/crates/rbuilder/src/building/sim.rs b/crates/rbuilder/src/building/sim.rs index 8a8ed804..07fdfb54 100644 --- a/crates/rbuilder/src/building/sim.rs +++ b/crates/rbuilder/src/building/sim.rs @@ -11,8 +11,6 @@ use crate::{ use ahash::{HashMap, HashSet}; use alloy_primitives::{Address, B256}; use rand::seq::SliceRandom; -use reth::providers::ProviderFactory; -use reth_db::database::Database; use reth_errors::ProviderError; use reth_payload_builder::database::CachedReads; use reth_provider::StateProvider; diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 52711831..b728bf1f 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -151,7 +151,7 @@ impl BaseConfig { cancellation_token: tokio_util::sync::CancellationToken, sink_factory: Box, slot_source: SlotSourceType, - ) -> eyre::Result, SlotSourceType>> + ) -> eyre::Result>, SlotSourceType>> where SlotSourceType: SlotSource, { @@ -172,29 +172,31 @@ impl BaseConfig { sink_factory: Box, slot_source: SlotSourceType, provider_factory: ProviderFactoryReopener>, - ) -> eyre::Result, SlotSourceType>> + ) -> eyre::Result>, SlotSourceType>> where SlotSourceType: SlotSource, { - Ok(LiveBuilder::, SlotSourceType> { - watchdog_timeout: self.watchdog_timeout(), - error_storage_path: self.error_storage_path.clone(), - simulation_threads: self.simulation_threads, - order_input_config: OrderInputConfig::from_config(self), - blocks_source: slot_source, - chain_chain_spec: self.chain_spec()?, - provider_factory, - - coinbase_signer: self.coinbase_signer()?, - extra_data: self.extra_data()?, - blocklist: self.blocklist()?, - - global_cancellation: cancellation_token, - - extra_rpc: RpcModule::new(()), - sink_factory, - builders: Vec::new(), - }) + Ok( + LiveBuilder::>, SlotSourceType> { + watchdog_timeout: self.watchdog_timeout(), + error_storage_path: self.error_storage_path.clone(), + simulation_threads: self.simulation_threads, + order_input_config: OrderInputConfig::from_config(self), + blocks_source: slot_source, + chain_chain_spec: self.chain_spec()?, + provider_factory, + + coinbase_signer: self.coinbase_signer()?, + extra_data: self.extra_data()?, + blocklist: self.blocklist()?, + + global_cancellation: cancellation_token, + + extra_rpc: RpcModule::new(()), + sink_factory, + builders: Vec::new(), + }, + ) } pub fn jsonrpc_server_ip(&self) -> Ipv4Addr { diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index a4abf576..cd801908 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -23,18 +23,18 @@ use super::{ }; #[derive(Debug)] -pub struct BlockBuildingPool { - provider_factory: ProviderFactoryReopener, - builders: Vec>>, +pub struct BlockBuildingPool { + provider_factory: Provider, + builders: Vec>>, sink_factory: Box, orderpool_subscriber: order_input::OrderPoolSubscriber, order_simulation_pool: OrderSimulationPool, } -impl BlockBuildingPool { +impl BlockBuildingPool { pub fn new( - provider_factory: ProviderFactoryReopener, - builders: Vec>>, + provider_factory: Provider, + builders: Vec>>, sink_factory: Box, orderpool_subscriber: order_input::OrderPoolSubscriber, order_simulation_pool: OrderSimulationPool, @@ -112,7 +112,7 @@ impl BlockBuildingPool { for builder in self.builders.iter() { let builder_name = builder.name(); debug!(block = block_number, builder_name, "Spawning builder job"); - let input = BlockBuildingAlgorithmInput:: { + let input = BlockBuildingAlgorithmInput:: { provider_factory: provider_factory.clone(), ctx: ctx.clone(), input: broadcast_input.subscribe(), diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index 943e2020..0cdd6e26 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -43,7 +43,9 @@ pub trait LiveBuilderConfig: std::fmt::Debug + serde::de::DeserializeOwned { &self, cancellation_token: CancellationToken, ) -> impl std::future::Future< - Output = eyre::Result, MevBoostSlotDataGenerator>>, + Output = eyre::Result< + LiveBuilder>, MevBoostSlotDataGenerator>, + >, > + Send; /// Patch until we have a unified way of backtesting using the exact algorithms we use on the LiveBuilder. diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index c053c71e..625fcbc4 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -30,6 +30,7 @@ use crate::{ }, mev_boost::BLSBlockSigner, primitives::mev_boost::{MevBoostRelay, RelayConfig}, + provider::StateProviderFactory, utils::{build_info::rbuilder_version, ProviderFactoryReopener, Signer}, validation_api_client::ValidationAPIClient, }; @@ -285,7 +286,9 @@ impl LiveBuilderConfig for Config { async fn create_builder( &self, cancellation_token: tokio_util::sync::CancellationToken, - ) -> eyre::Result, MevBoostSlotDataGenerator>> { + ) -> eyre::Result< + super::LiveBuilder>, MevBoostSlotDataGenerator>, + > { let (sink_factory, relays) = self.l1_config.create_relays_sink_factory( self.base_config.chain_spec()?, Box::new(DummyBiddingService {}), @@ -427,22 +430,22 @@ pub fn coinbase_signer_from_secret_key(secret_key: &str) -> eyre::Result Ok(Signer::try_from_secret(secret_key)?) } -fn create_builders( +fn create_builders( configs: Vec, root_hash_task_pool: BlockingTaskPool, sbundle_mergeabe_signers: Vec
, -) -> Vec>>> { +) -> Vec>> { configs .into_iter() .map(|cfg| create_builder(cfg, &root_hash_task_pool, &sbundle_mergeabe_signers)) .collect() } -fn create_builder( +fn create_builder( cfg: BuilderConfig, root_hash_task_pool: &BlockingTaskPool, sbundle_mergeabe_signers: &[Address], -) -> Arc>> { +) -> Arc> { match cfg.builder { SpecificBuilderConfig::OrderingBuilder(order_cfg) => { Arc::new(OrderingBuildingAlgorithm::new( diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index a3cff0c6..c46cab27 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -18,7 +18,6 @@ use crate::{ simulation::OrderSimulationPool, watchdog::spawn_watchdog_thread, }, - primitives::mev_boost::MevBoostRelay, provider::StateProviderFactory, telemetry::inc_active_slots, utils::{error_storage::spawn_error_storage_writer, Signer}, @@ -37,9 +36,9 @@ use reth_chainspec::ChainSpec; use reth_db::database::Database; use std::{cmp::min, path::PathBuf, sync::Arc, time::Duration}; use time::OffsetDateTime; -use tokio::{sync::mpsc, task::spawn_blocking}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; /// Time the proposer have to propose a block from the beginning of the slot (https://www.paradigm.xyz/2023/04/mev-boost-ethereum-consensus Slot anatomy) const SLOT_PROPOSAL_DURATION: std::time::Duration = Duration::from_secs(4); @@ -59,7 +58,7 @@ pub trait SlotSource { /// # Usage /// Create and run() #[derive(Debug)] -pub struct LiveBuilder { +pub struct LiveBuilder { pub watchdog_timeout: Duration, pub error_storage_path: PathBuf, pub simulation_threads: usize, @@ -76,18 +75,18 @@ pub struct LiveBuilder { pub global_cancellation: CancellationToken, pub sink_factory: Box, - pub builders: Vec>>, + pub builders: Vec>>, pub extra_rpc: RpcModule<()>, } -impl - LiveBuilder +impl + LiveBuilder { pub fn with_extra_rpc(self, extra_rpc: RpcModule<()>) -> Self { Self { extra_rpc, ..self } } - pub fn with_builders(self, builders: Vec>>) -> Self { + pub fn with_builders(self, builders: Vec>>) -> Self { Self { builders, ..self } } diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 26132ead..011e596c 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -7,18 +7,13 @@ use crate::{ BlockBuildingContext, }, live_builder::order_input::orderpool::OrdersForBlock, - primitives::{Order, OrderId, SimulatedOrder}, + primitives::{OrderId, SimulatedOrder}, provider::StateProviderFactory, utils::gen_uid, }; -use ahash::{HashMap, HashSet}; -use alloy_primitives::utils::format_ether; -use reth_db::database::Database; +use ahash::HashMap; use simulation_job::SimulationJob; -use std::{ - fmt, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{info_span, Instrument}; @@ -70,7 +65,7 @@ pub enum SimulatedOrderCommand { Cancellation(OrderId), } -impl OrderSimulationPool { +impl OrderSimulationPool { pub fn new( provider_factory: Provider, num_workers: usize, @@ -172,6 +167,7 @@ mod tests { building::testing::test_chain_state::{BlockArgs, NamedAddr, TestChainState, TxArgs}, live_builder::order_input::order_sink::OrderPoolCommand, primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}, + utils::ProviderFactoryReopener, }; use reth_primitives::U256; diff --git a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs index c9e6884a..751c6bba 100644 --- a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs +++ b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs @@ -4,10 +4,10 @@ use crate::{ building::sim::{SimTree, SimulatedResult, SimulationRequest}, live_builder::order_input::order_sink::OrderPoolCommand, primitives::{Order, OrderId}, + provider::StateProviderFactory, }; use ahash::HashSet; use alloy_primitives::utils::format_ether; -use reth_db::database::Database; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; @@ -25,7 +25,7 @@ use super::SimulatedOrderCommand; /// If we get a cancellation and the order is in in_flight_orders we just remove it from in_flight_orders. /// Only SimulatedOrders still in in_flight_orders are delivered. /// @Pending: implement cancellations in the SimTree. -pub struct SimulationJob { +pub struct SimulationJob { block_cancellation: CancellationToken, /// Input orders to be simulated new_order_sub: mpsc::UnboundedReceiver, @@ -35,7 +35,7 @@ pub struct SimulationJob { sim_results_receiver: mpsc::Receiver, /// Output of the simulations slot_sim_results_sender: mpsc::Sender, - sim_tree: SimTree, + sim_tree: SimTree, orders_received: OrderCounter, orders_simulated_ok: OrderCounter, @@ -45,14 +45,14 @@ pub struct SimulationJob { in_flight_orders: HashSet, } -impl SimulationJob { +impl SimulationJob { pub fn new( block_cancellation: CancellationToken, new_order_sub: mpsc::UnboundedReceiver, sim_req_sender: flume::Sender, sim_results_receiver: mpsc::Receiver, slot_sim_results_sender: mpsc::Sender, - sim_tree: SimTree, + sim_tree: SimTree, ) -> Self { Self { block_cancellation, diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs index 2dc1981b..1ecd21b3 100644 --- a/crates/rbuilder/src/provider/http_provider.rs +++ b/crates/rbuilder/src/provider/http_provider.rs @@ -16,6 +16,8 @@ struct HttpProvider { provider: RootProvider>, } +// ---> TODOOOOO!!! <--- + impl HttpProvider { pub fn new_with_url(url: &str) -> Self { let provider = ProviderBuilder::new() From 1f32383e4039bb18acc715187b6973379d7f2491 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 11:43:02 +0100 Subject: [PATCH 05/10] More work --- .../building/builders/block_building_helper.rs | 17 +++++++++-------- .../src/building/builders/ordering_builder.rs | 4 ++-- crates/rbuilder/src/building/mod.rs | 7 ++++--- .../rbuilder/src/live_builder/building/mod.rs | 14 ++------------ crates/rbuilder/src/live_builder/cli.rs | 4 ++-- crates/rbuilder/src/live_builder/config.rs | 5 +++-- .../rbuilder/src/live_builder/simulation/mod.rs | 3 ++- .../src/live_builder/simulation/sim_worker.rs | 10 ---------- crates/rbuilder/src/provider/mod.rs | 6 +++++- crates/rbuilder/src/roothash/mod.rs | 1 + crates/rbuilder/src/utils/noncer.rs | 4 +++- .../src/utils/provider_factory_reopen.rs | 13 ++++++++++++- 12 files changed, 45 insertions(+), 43 deletions(-) diff --git a/crates/rbuilder/src/building/builders/block_building_helper.rs b/crates/rbuilder/src/building/builders/block_building_helper.rs index e8fa537b..8610ac0d 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper.rs @@ -5,10 +5,8 @@ use std::{ use alloy_primitives::U256; use reth::tasks::pool::BlockingTaskPool; -use reth_db::database::Database; use reth_payload_builder::database::CachedReads; use reth_primitives::format_ether; -use reth_provider::{BlockNumReader, ProviderFactory}; use time::OffsetDateTime; use tokio_util::sync::CancellationToken; use tracing::{debug, error, trace}; @@ -21,6 +19,7 @@ use crate::{ Sorting, }, primitives::SimulatedOrder, + provider::StateProviderFactory, roothash::RootHashMode, telemetry, }; @@ -74,9 +73,9 @@ pub trait BlockBuildingHelper { fn building_context(&self) -> &BlockBuildingContext; } -/// Implementation of BlockBuildingHelper based on a ProviderFactory +/// Implementation of BlockBuildingHelper based on a Provider #[derive(Clone)] -pub struct BlockBuildingHelperFromDB { +pub struct BlockBuildingHelperFromDB { /// Balance of fee recipient before we stared building. _fee_recipient_balance_start: U256, /// Accumulated changes for the block (due to commit_order calls). @@ -91,7 +90,7 @@ pub struct BlockBuildingHelperFromDB { building_ctx: BlockBuildingContext, built_block_trace: BuiltBlockTrace, /// Needed to get the initial state and the final root hash calculation. - provider_factory: ProviderFactory, + provider_factory: Provider, root_hash_task_pool: BlockingTaskPool, root_hash_mode: RootHashMode, /// Token to cancel in case of fatal error (if we believe that it's impossible to build for this block). @@ -122,7 +121,7 @@ pub struct FinalizeBlockResult { pub cached_reads: CachedReads, } -impl BlockBuildingHelperFromDB { +impl BlockBuildingHelperFromDB { /// allow_tx_skip: see [`PartialBlockFork`] /// Performs initialization: /// - Query fee_recipient_balance_start. @@ -130,7 +129,7 @@ impl BlockBuildingHelperFromDB { /// - Estimate payout tx cost. #[allow(clippy::too_many_arguments)] pub fn new( - provider_factory: ProviderFactory, + provider_factory: Provider, root_hash_task_pool: BlockingTaskPool, root_hash_mode: RootHashMode, building_ctx: BlockBuildingContext, @@ -256,7 +255,9 @@ impl BlockBuildingHelperFromDB { } } -impl BlockBuildingHelper for BlockBuildingHelperFromDB { +impl BlockBuildingHelper + for BlockBuildingHelperFromDB +{ /// Forwards to partial_block and updates trace. fn commit_order( &mut self, diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index ea386b71..15aa445d 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -20,7 +20,7 @@ use ahash::{HashMap, HashSet}; use alloy_primitives::Address; use tokio_util::sync::CancellationToken; -use crate::{roothash::RootHashMode, utils::check_provider_factory_health}; +use crate::roothash::RootHashMode; use reth::tasks::pool::BlockingTaskPool; use reth_payload_builder::database::CachedReads; use serde::Deserialize; @@ -237,7 +237,7 @@ impl OrderingBuilderContext PartialBlock { } #[allow(clippy::too_many_arguments)] - pub fn finalize( + pub fn finalize( self, state: &mut BlockState, ctx: &BlockBuildingContext, - provider_factory: ProviderFactory, + provider_factory: Provider, root_hash_mode: RootHashMode, root_hash_task_pool: BlockingTaskPool, ) -> eyre::Result { diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index cd801908..b6725c0e 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -12,7 +12,7 @@ use crate::{ }; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, trace}; +use tracing::{debug, trace}; use super::{ order_input::{ @@ -98,22 +98,12 @@ impl BlockBuildingPool(); - let provider_factory = match self - .provider_factory - .check_consistency_and_reopen_if_needed(block_number) - { - Ok(provider_factory) => provider_factory, - Err(err) => { - error!(?err, "Error while reopening provider factory"); - return; - } - }; for builder in self.builders.iter() { let builder_name = builder.name(); debug!(block = block_number, builder_name, "Spawning builder job"); let input = BlockBuildingAlgorithmInput:: { - provider_factory: provider_factory.clone(), + provider_factory: self.provider_factory.clone(), ctx: ctx.clone(), input: broadcast_input.subscribe(), sink: builder_sink.clone(), diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index 0cdd6e26..f0f755b2 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -50,10 +50,10 @@ pub trait LiveBuilderConfig: std::fmt::Debug + serde::de::DeserializeOwned { /// Patch until we have a unified way of backtesting using the exact algorithms we use on the LiveBuilder. /// building_algorithm_name will come from the specific configuration. - fn build_backtest_block( + fn build_backtest_block( &self, building_algorithm_name: &str, - input: BacktestSimulateBlockInput<'_, Arc>, + input: BacktestSimulateBlockInput<'_, Provider>, ) -> eyre::Result<(Block, CachedReads)>; } diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 625fcbc4..64d2206e 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -316,10 +316,11 @@ impl LiveBuilderConfig for Config { rbuilder_version() } - fn build_backtest_block( + fn build_backtest_block( + // This part I am unsure how to fix... &self, building_algorithm_name: &str, - input: BacktestSimulateBlockInput<'_, Arc>, + input: BacktestSimulateBlockInput<'_, Provider>, ) -> eyre::Result<(Block, CachedReads)> { let builder_cfg = self.builder(building_algorithm_name)?; match builder_cfg.builder { diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 011e596c..490f02c4 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -107,7 +107,8 @@ impl OrderSimulationPoo ) -> SlotOrderSimResults { let (slot_sim_results_sender, slot_sim_results_receiver) = mpsc::channel(10_000); - let provider = self.provider_factory.provider_factory_unchecked(); + // let provider = self.provider_factory.provider_factory_unchecked(); + let provider = self.provider_factory.clone(); let current_contexts = Arc::clone(&self.current_contexts); let block_context: BlockContextId = gen_uid(); diff --git a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs index 01865b5b..927db4eb 100644 --- a/crates/rbuilder/src/live_builder/simulation/sim_worker.rs +++ b/crates/rbuilder/src/live_builder/simulation/sim_worker.rs @@ -44,16 +44,6 @@ pub fn run_sim_worker( } }; - let provider_factory = match provider_factory.check_consistency_and_reopen_if_needed( - current_sim_context.block_ctx.block_env.number.to(), - ) { - Ok(provider_factory) => provider_factory, - Err(err) => { - error!(?err, "Error while reopening provider factory"); - continue; - } - }; - let mut cached_reads = CachedReads::default(); let mut last_sim_finished = Instant::now(); while let Ok(task) = current_sim_context.requests.recv() { diff --git a/crates/rbuilder/src/provider/mod.rs b/crates/rbuilder/src/provider/mod.rs index 0967b285..6fda341f 100644 --- a/crates/rbuilder/src/provider/mod.rs +++ b/crates/rbuilder/src/provider/mod.rs @@ -1,5 +1,5 @@ use reth_errors::ProviderResult; -use reth_primitives::BlockNumber; +use reth_primitives::{BlockNumber, B256}; use reth_provider::StateProviderBox; mod http_provider; @@ -10,5 +10,9 @@ pub trait StateProviderFactory: Send + Sync { block_number: BlockNumber, ) -> ProviderResult; + fn history_by_block_hash(&self, block_hash: B256) -> ProviderResult; + + fn last_block_number(&self) -> ProviderResult; + fn latest(&self) -> ProviderResult; } diff --git a/crates/rbuilder/src/roothash/mod.rs b/crates/rbuilder/src/roothash/mod.rs index 8320a1db..35a77ba4 100644 --- a/crates/rbuilder/src/roothash/mod.rs +++ b/crates/rbuilder/src/roothash/mod.rs @@ -3,6 +3,7 @@ use reth::{ providers::{providers::ConsistentDbView, ExecutionOutcome, ProviderFactory}, tasks::pool::BlockingTaskPool, }; +use reth_db::database::Database; use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError}; #[derive(Debug, Clone, Copy)] diff --git a/crates/rbuilder/src/utils/noncer.rs b/crates/rbuilder/src/utils/noncer.rs index a32dc2d7..5c24c0d5 100644 --- a/crates/rbuilder/src/utils/noncer.rs +++ b/crates/rbuilder/src/utils/noncer.rs @@ -4,6 +4,8 @@ use reth::providers::StateProviderBox; use reth_errors::ProviderResult; use std::sync::{Arc, Mutex}; +use crate::provider::StateProviderFactory; + /// Struct to get nonces for Addresses, caching the results. /// NonceCache contains the data (but doesn't allow you to query it) and NonceCacheRef is a reference that allows you to query it. /// Usage: @@ -19,7 +21,7 @@ pub struct NonceCache { block: B256, } -impl NonceCache { +impl NonceCache { pub fn new(provider_factory: Provider, block: B256) -> Self { Self { provider_factory, diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index ea9b3eb8..6ec7f0ce 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -137,11 +137,22 @@ pub fn check_provider_factory_health( impl StateProviderFactory for ProviderFactoryReopener { fn history_by_block_number( &self, - block_number: BlockNumber, + _block_number: BlockNumber, ) -> ProviderResult { unimplemented!("TODO"); } + fn history_by_block_hash( + &self, + _block_hash: revm_primitives::B256, + ) -> ProviderResult { + unimplemented!("TODO"); + } + + fn last_block_number(&self) -> ProviderResult { + unimplemented!("TODO"); + } + fn latest(&self) -> ProviderResult { unimplemented!("TODO"); } From e5ecc4f14a600873b081a8a7a21ced40042b0235 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 18:53:02 +0100 Subject: [PATCH 06/10] More changes --- .../src/backtest/backtest_build_range.rs | 5 +- crates/rbuilder/src/backtest/execute.rs | 9 +- .../src/backtest/redistribute/cli/mod.rs | 32 +-- .../rbuilder/src/backtest/redistribute/mod.rs | 78 ++++--- .../rbuilder/src/bin/debug-bench-machine.rs | 11 +- crates/rbuilder/src/bin/dummy-builder.rs | 78 +++---- crates/rbuilder/src/building/mod.rs | 21 +- crates/rbuilder/src/live_builder/cli.rs | 3 +- crates/rbuilder/src/live_builder/config.rs | 6 +- crates/rbuilder/src/live_builder/mod.rs | 16 +- crates/rbuilder/src/provider/http_provider.rs | 199 ------------------ crates/rbuilder/src/provider/mod.rs | 14 +- .../src/utils/provider_factory_reopen.rs | 39 +++- 13 files changed, 177 insertions(+), 334 deletions(-) delete mode 100644 crates/rbuilder/src/provider/http_provider.rs diff --git a/crates/rbuilder/src/backtest/backtest_build_range.rs b/crates/rbuilder/src/backtest/backtest_build_range.rs index b54b5f6a..bb020966 100644 --- a/crates/rbuilder/src/backtest/backtest_build_range.rs +++ b/crates/rbuilder/src/backtest/backtest_build_range.rs @@ -110,10 +110,7 @@ pub async fn run_backtest_build_range( } #[allow(clippy::too_many_arguments)] -pub fn backtest_simulate_block( +pub fn backtest_simulate_block< + ConfigType: LiveBuilderConfig, + Provider: StateProviderFactory + Clone + 'static, +>( block_data: BlockData, - provider_factory: ProviderFactory>, + provider_factory: Provider, chain_spec: Arc, build_block_lag_ms: i64, builders_names: Vec, diff --git a/crates/rbuilder/src/backtest/redistribute/cli/mod.rs b/crates/rbuilder/src/backtest/redistribute/cli/mod.rs index 15ccd3c6..437ef46d 100644 --- a/crates/rbuilder/src/backtest/redistribute/cli/mod.rs +++ b/crates/rbuilder/src/backtest/redistribute/cli/mod.rs @@ -1,18 +1,17 @@ mod csv_output; -use crate::backtest::redistribute::{calc_redistributions, RedistributionBlockOutput}; -use crate::backtest::BlockData; -use crate::live_builder::base_config::load_config_toml_and_env; -use crate::live_builder::cli::LiveBuilderConfig; -use crate::{backtest::HistoricalDataStorage, live_builder::config::Config}; +use crate::{ + backtest::{ + redistribute::{calc_redistributions, RedistributionBlockOutput}, + BlockData, HistoricalDataStorage, + }, + live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config}, + provider::StateProviderFactory, +}; use alloy_primitives::utils::format_ether; use clap::Parser; use csv_output::{CSVOutputRow, CSVResultWriter}; -use reth_db::DatabaseEnv; -use reth_provider::ProviderFactory; -use std::io; -use std::path::PathBuf; -use std::sync::Arc; +use std::{io, path::PathBuf}; use tracing::info; #[derive(Parser, Debug)] @@ -54,10 +53,8 @@ pub async fn run_backtest_redistribute() -> eyre: let mut historical_data_storage = HistoricalDataStorage::new_from_path(&config.base_config.backtest_fetch_output_file) .await?; - let provider_factory = config - .base_config - .provider_factory()? - .provider_factory_unchecked(); + let provider_factory = config.base_config.provider_factory()?; + let mut csv_writer = cli .csv .map(|path| -> io::Result<_> { CSVResultWriter::new(path) }) @@ -107,11 +104,14 @@ pub async fn run_backtest_redistribute() -> eyre: Ok(()) } -fn process_redisribution( +fn process_redisribution< + ConfigType: LiveBuilderConfig + Send + Sync, + Provider: StateProviderFactory + Clone + 'static, +>( block_data: BlockData, csv_writer: Option<&mut CSVResultWriter>, json_accum: Option<&mut Vec>, - provider_factory: ProviderFactory>, + provider_factory: Provider, config: &ConfigType, distribute_to_mempool_txs: bool, ) -> eyre::Result<()> { diff --git a/crates/rbuilder/src/backtest/redistribute/mod.rs b/crates/rbuilder/src/backtest/redistribute/mod.rs index 82f0034f..419bb14c 100644 --- a/crates/rbuilder/src/backtest/redistribute/mod.rs +++ b/crates/rbuilder/src/backtest/redistribute/mod.rs @@ -1,29 +1,34 @@ mod cli; mod redistribution_algo; -use crate::backtest::execute::backtest_simulate_block; -use crate::backtest::redistribute::redistribution_algo::{ - IncludedOrderData, RedistributionCalculator, RedistributionIdentityData, RedistributionResult, -}; -use crate::backtest::restore_landed_orders::{ - restore_landed_orders, sim_historical_block, ExecutedBlockTx, LandedOrderData, SimplifiedOrder, +use crate::{ + backtest::{ + execute::backtest_simulate_block, + redistribute::redistribution_algo::{ + IncludedOrderData, RedistributionCalculator, RedistributionIdentityData, + RedistributionResult, + }, + restore_landed_orders::{ + restore_landed_orders, sim_historical_block, ExecutedBlockTx, LandedOrderData, + SimplifiedOrder, + }, + BlockData, BuiltBlockData, OrdersWithTimestamp, + }, + live_builder::cli::LiveBuilderConfig, + primitives::{Order, OrderId}, + provider::StateProviderFactory, + utils::{signed_uint_delta, u256decimal_serde_helper}, }; -use crate::backtest::{BlockData, BuiltBlockData, OrdersWithTimestamp}; -use crate::live_builder::cli::LiveBuilderConfig; -use crate::primitives::{Order, OrderId}; -use crate::utils::signed_uint_delta; -use crate::utils::u256decimal_serde_helper; use ahash::{HashMap, HashSet}; -use alloy_primitives::utils::format_ether; -use alloy_primitives::{Address, B256, I256, U256}; +use alloy_primitives::{utils::format_ether, Address, B256, I256, U256}; pub use cli::run_backtest_redistribute; use jsonrpsee::core::Serialize; use rayon::prelude::*; use reth_chainspec::ChainSpec; -use reth_db::DatabaseEnv; -use reth_provider::ProviderFactory; -use std::cmp::{max, min}; -use std::sync::Arc; +use std::{ + cmp::{max, min}, + sync::Arc, +}; use tracing::{info, info_span, trace, warn}; #[derive(Debug, Clone, Serialize)] @@ -87,8 +92,11 @@ pub struct RedistributionBlockOutput { pub joint_contribution: Vec, } -pub fn calc_redistributions( - provider_factory: ProviderFactory>, +pub fn calc_redistributions< + ConfigType: LiveBuilderConfig + Send + Sync, + Provider: StateProviderFactory + Clone + 'static, +>( + provider_factory: Provider, config: &ConfigType, block_data: BlockData, distribute_to_mempool_txs: bool, @@ -233,8 +241,8 @@ fn get_available_orders( included_orders_available } -fn restore_available_landed_orders( - provider_factory: ProviderFactory>, +fn restore_available_landed_orders( + provider_factory: Provider, chain_spec: Arc, block_data: &BlockData, included_orders_available: &[OrdersWithTimestamp], @@ -361,8 +369,11 @@ impl ResultsWithoutExclusion { } } -fn calculate_backtest_without_exclusion( - provider_factory: ProviderFactory>, +fn calculate_backtest_without_exclusion< + ConfigType: LiveBuilderConfig, + Provider: StateProviderFactory + Clone + 'static, +>( + provider_factory: Provider, config: &ConfigType, block_data: BlockData, ) -> eyre::Result { @@ -420,8 +431,11 @@ impl ExclusionResults { } } -fn calculate_backtest_identity_and_order_exclusion( - provider_factory: ProviderFactory>, +fn calculate_backtest_identity_and_order_exclusion< + ConfigType: LiveBuilderConfig + Sync, + Provider: StateProviderFactory + Clone + 'static, +>( + provider_factory: Provider, config: &ConfigType, block_data: BlockData, available_orders: &AvailableOrders, @@ -471,8 +485,11 @@ fn calculate_backtest_identity_and_order_exclusion( - provider_factory: ProviderFactory>, +fn calc_joint_exclusion_results< + ConfigType: LiveBuilderConfig + Sync, + Provider: StateProviderFactory + Clone + 'static, +>( + provider_factory: Provider, config: &ConfigType, block_data: BlockData, available_orders: &AvailableOrders, @@ -758,8 +775,11 @@ struct ExclusionResult { } /// calculate block profit excluding some orders -fn calc_profit_after_exclusion( - provider_factory: ProviderFactory>, +fn calc_profit_after_exclusion< + ConfigType: LiveBuilderConfig, + Provider: StateProviderFactory + Clone + 'static, +>( + provider_factory: Provider, config: &ConfigType, block_data: &BlockData, exclusion_input: ExclusionInput, diff --git a/crates/rbuilder/src/bin/debug-bench-machine.rs b/crates/rbuilder/src/bin/debug-bench-machine.rs index 109aa8a0..aa84cbef 100644 --- a/crates/rbuilder/src/bin/debug-bench-machine.rs +++ b/crates/rbuilder/src/bin/debug-bench-machine.rs @@ -9,13 +9,11 @@ use rbuilder::{ }, live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig, config::Config}, primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs}, + provider::StateProviderFactory, roothash::RootHashMode, utils::{default_cfg_env, Signer}, }; -use reth::{ - payload::PayloadId, - providers::{BlockNumReader, BlockReader}, -}; +use reth::payload::PayloadId; use reth_payload_builder::{database::CachedReads, EthPayloadBuilderAttributes}; use reth_provider::StateProvider; use revm_primitives::{BlobExcessGasAndPrice, BlockEnv, SpecId}; @@ -38,10 +36,7 @@ async fn main() -> eyre::Result<()> { let chain = config.base_config().chain_spec()?; - let factory = config - .base_config() - .provider_factory()? - .provider_factory_unchecked(); + let factory = config.base_config().provider_factory()?; let last_block = factory.last_block_number()?; let block_data = factory diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 71cd5f6c..afab5dbb 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -34,12 +34,13 @@ use rbuilder::{ mev_boost::{MevBoostRelay, RelayConfig}, SimulatedOrder, }, + provider::StateProviderFactory, roothash::RootHashMode, - utils::Signer, + utils::{ProviderFactoryReopener, Signer}, }; -use reth::{providers::ProviderFactory, tasks::pool::BlockingTaskPool}; +use reth::tasks::pool::BlockingTaskPool; use reth_chainspec::MAINNET; -use reth_db::{database::Database, DatabaseEnv}; +use reth_db::DatabaseEnv; use tokio::{signal::ctrl_c, sync::broadcast}; use tokio_util::sync::CancellationToken; use tracing::{info, level_filters::LevelFilter}; @@ -70,36 +71,37 @@ async fn main() -> eyre::Result<()> { cancel.clone(), ); - let builder = LiveBuilder::, MevBoostSlotDataGenerator> { - watchdog_timeout: Duration::from_secs(10000), - error_storage_path: DEFAULT_ERROR_STORAGE_PATH.parse().unwrap(), - simulation_threads: 1, - blocks_source: payload_event, - order_input_config: OrderInputConfig::new( - false, - true, - DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(), - DEFAULT_INCOMING_BUNDLES_PORT, - *DEFAULT_IP, - DEFAULT_SERVE_MAX_CONNECTIONS, - DEFAULT_RESULTS_CHANNEL_TIMEOUT, - DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, - ), - chain_chain_spec: chain_spec.clone(), - provider_factory: create_provider_factory( - Some(&RETH_DB_PATH.parse::().unwrap()), - None, - None, - chain_spec.clone(), - )?, - coinbase_signer: Signer::random(), - extra_data: Vec::new(), - blocklist: Default::default(), - global_cancellation: cancel.clone(), - extra_rpc: RpcModule::new(()), - sink_factory: Box::new(TraceBlockSinkFactory {}), - builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], - }; + let builder = + LiveBuilder::>, MevBoostSlotDataGenerator> { + watchdog_timeout: Duration::from_secs(10000), + error_storage_path: DEFAULT_ERROR_STORAGE_PATH.parse().unwrap(), + simulation_threads: 1, + blocks_source: payload_event, + order_input_config: OrderInputConfig::new( + false, + true, + DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(), + DEFAULT_INCOMING_BUNDLES_PORT, + *DEFAULT_IP, + DEFAULT_SERVE_MAX_CONNECTIONS, + DEFAULT_RESULTS_CHANNEL_TIMEOUT, + DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, + ), + chain_chain_spec: chain_spec.clone(), + provider_factory: create_provider_factory( + Some(&RETH_DB_PATH.parse::().unwrap()), + None, + None, + chain_spec.clone(), + )?, + coinbase_signer: Signer::random(), + extra_data: Vec::new(), + blocklist: Default::default(), + global_cancellation: cancel.clone(), + extra_rpc: RpcModule::new(()), + sink_factory: Box::new(TraceBlockSinkFactory {}), + builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], + }; let ctrlc = tokio::spawn(async move { ctrl_c().await.unwrap_or_default(); @@ -190,10 +192,10 @@ impl DummyBuildingAlgorithm { } } - fn build_block( + fn build_block( &self, orders: Vec, - provider_factory: ProviderFactory, + provider_factory: Provider, ctx: &BlockBuildingContext, ) -> eyre::Result> { let mut block_building_helper = BlockBuildingHelperFromDB::new( @@ -216,12 +218,14 @@ impl DummyBuildingAlgorithm { } } -impl BlockBuildingAlgorithm for DummyBuildingAlgorithm { +impl BlockBuildingAlgorithm + for DummyBuildingAlgorithm +{ fn name(&self) -> String { BUILDER_NAME.to_string() } - fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { if let Some(orders) = self.wait_for_orders(&input.cancel, input.input) { let block = self .build_block(orders, input.provider_factory, &input.ctx) diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index 7edd1cdd..aefb10a0 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -16,7 +16,6 @@ use reth_primitives::proofs::calculate_requests_root; use crate::{ primitives::{Order, OrderId, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs}, - roothash::calculate_state_root, utils::{a2r_withdrawal, calc_gas_limit, timestamp_as_u64, Signer}, }; use ahash::HashSet; @@ -542,8 +541,8 @@ impl PartialBlock { state: &mut BlockState, ctx: &BlockBuildingContext, provider_factory: Provider, - root_hash_mode: RootHashMode, - root_hash_task_pool: BlockingTaskPool, + _root_hash_mode: RootHashMode, + _root_hash_task_pool: BlockingTaskPool, ) -> eyre::Result { let (withdrawals_root, withdrawals) = { let mut db = state.new_db_ref(); @@ -602,13 +601,17 @@ impl PartialBlock { .block_logs_bloom(block_number) .expect("Number is in range"); + let state_root = provider_factory.state_root(ctx.attributes.parent, &execution_outcome)?; + + /* let state_root = calculate_state_root( - provider_factory, - ctx.attributes.parent, - &execution_outcome, - root_hash_mode, - root_hash_task_pool, - )?; + provider_factory, + ctx.attributes.parent, + &execution_outcome, + root_hash_mode, + root_hash_task_pool, + )?; + */ // create the block header let transactions_root = proofs::calculate_transaction_root(&self.executed_tx); diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index f0f755b2..42f6b6dd 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -11,6 +11,7 @@ use crate::{ live_builder::{ base_config::load_config_toml_and_env, payload_events::MevBoostSlotDataGenerator, }, + provider::StateProviderFactory, telemetry::spawn_telemetry_server, utils::{build_info::Version, ProviderFactoryReopener}, }; @@ -50,7 +51,7 @@ pub trait LiveBuilderConfig: std::fmt::Debug + serde::de::DeserializeOwned { /// Patch until we have a unified way of backtesting using the exact algorithms we use on the LiveBuilder. /// building_algorithm_name will come from the specific configuration. - fn build_backtest_block( + fn build_backtest_block( &self, building_algorithm_name: &str, input: BacktestSimulateBlockInput<'_, Provider>, diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 64d2206e..ba35faef 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -316,7 +316,7 @@ impl LiveBuilderConfig for Config { rbuilder_version() } - fn build_backtest_block( + fn build_backtest_block( // This part I am unsure how to fix... &self, building_algorithm_name: &str, @@ -431,7 +431,7 @@ pub fn coinbase_signer_from_secret_key(secret_key: &str) -> eyre::Result Ok(Signer::try_from_secret(secret_key)?) } -fn create_builders( +fn create_builders( configs: Vec, root_hash_task_pool: BlockingTaskPool, sbundle_mergeabe_signers: Vec
, @@ -442,7 +442,7 @@ fn create_builders( .collect() } -fn create_builder( +fn create_builder( cfg: BuilderConfig, root_hash_task_pool: &BlockingTaskPool, sbundle_mergeabe_signers: &[Address], diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index c46cab27..d05d7591 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -28,12 +28,8 @@ use building::BlockBuildingPool; use eyre::Context; use jsonrpsee::RpcModule; use payload_events::MevBoostSlotData; -use reth::{ - primitives::Header, - providers::{HeaderProvider, ProviderFactory}, -}; +use reth::primitives::Header; use reth_chainspec::ChainSpec; -use reth_db::database::Database; use std::{cmp::min, path::PathBuf, sync::Arc, time::Duration}; use time::OffsetDateTime; use tokio::sync::mpsc; @@ -163,20 +159,16 @@ impl header, Err(err) => { warn!("Failed to get parent header for new slot: {:?}", err); continue; } } - */ }; { @@ -244,10 +236,10 @@ impl( +async fn wait_for_block_header( block: B256, slot_time: OffsetDateTime, - provider_factory: &ProviderFactory, + provider_factory: &Provider, ) -> eyre::Result
{ let dead_line = slot_time + BLOCK_HEADER_DEAD_LINE_DELTA; while OffsetDateTime::now_utc() < dead_line { diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs deleted file mode 100644 index 1ecd21b3..00000000 --- a/crates/rbuilder/src/provider/http_provider.rs +++ /dev/null @@ -1,199 +0,0 @@ -use super::StateProviderFactory; -use alloy_provider::{Provider, ProviderBuilder, RootProvider}; -use alloy_rpc_types::{BlockId, BlockNumberOrTag}; -use alloy_transport_http::Http; -use reqwest::Client; -use reth_errors::ProviderResult; -use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256}; -use reth_provider::{ - AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateProviderBox, - StateRootProvider, -}; -use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; -use tokio::runtime::Runtime; - -struct HttpProvider { - provider: RootProvider>, -} - -// ---> TODOOOOO!!! <--- - -impl HttpProvider { - pub fn new_with_url(url: &str) -> Self { - let provider = ProviderBuilder::new() - .on_http(url.parse().unwrap()) - .unwrap(); - - Self { provider } - } - - fn get_block_number_by_tag(&self, tag: BlockNumberOrTag) -> ProviderResult { - let rt = Runtime::new() - .unwrap() - .block_on(self.provider.get_block_by_number(tag, false)) - .unwrap() - .unwrap(); - - Ok(rt.header.number.unwrap()) - } -} - -impl StateProviderFactory for HttpProvider { - fn history_by_block_number( - &self, - block_number: BlockNumber, - ) -> ProviderResult { - self.provider - .get_block_by_number(BlockNumberOrTag::Number(block_number), false); - - unimplemented!("TODO") - } - - fn latest(&self) -> ProviderResult { - unimplemented!("TODO") - } -} - -pub struct HttpProviderState { - provider: RootProvider>, - hash: B256, -} - -impl HttpProviderState { - pub fn new(provider: RootProvider>, hash: B256) -> Self { - Self { provider, hash } - } -} - -impl StateProvider for HttpProviderState { - /// Get storage of given account. - fn storage( - &self, - account: Address, - storage_key: StorageKey, - ) -> ProviderResult> { - let res = Runtime::new() - .unwrap() - .block_on(self.provider.get_storage_at( - account, - storage_key.into(), - BlockId::hash(self.hash), - )) - .unwrap(); - - Ok(Some(res)) - } - - /// Get account code by its hash - fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { - // find the specific address - let address = Address::from_word(code_hash); - - let res = Runtime::new() - .unwrap() - .block_on(self.provider.get_code_at(address, BlockId::hash(self.hash))) - .unwrap(); - - Ok(Some(Bytecode::new_raw(res))) - } -} - -impl BlockHashReader for HttpProviderState { - /// Get the hash of the block with the given number. Returns `None` if no block with this number - /// exists. - fn block_hash(&self, number: BlockNumber) -> ProviderResult> { - let res = Runtime::new() - .unwrap() - .block_on( - self.provider - .get_block_by_number(BlockNumberOrTag::Number(number), false), - ) - .unwrap() - .unwrap(); - - Ok(res.header.hash) - } - - fn canonical_hashes_range( - &self, - start: BlockNumber, - end: BlockNumber, - ) -> ProviderResult> { - let mut res = vec![]; - - for i in start..end { - let block: alloy_rpc_types::Block = Runtime::new() - .unwrap() - .block_on( - self.provider - .get_block_by_number(BlockNumberOrTag::Number(i), false), - ) - .unwrap() - .unwrap(); - - res.push(block.header.hash.unwrap()); - } - - Ok(res) - } -} - -#[derive(Debug)] -enum AccountError {} - -impl AccountReader for HttpProviderState { - fn basic_account(&self, address: Address) -> ProviderResult> { - let res: Result = Runtime::new().unwrap().block_on(async { - let balance = self - .provider - .get_balance(address, BlockId::hash(self.hash)) - .await - .unwrap(); - - let nonce = self - .provider - .get_transaction_count(address, BlockId::hash(self.hash)) - .await - .unwrap(); - - Ok(Account { - balance, - nonce, - bytecode_hash: Some(address.into_word()), - }) - }); - - let res = res.unwrap(); - - Ok(Some(res)) - } -} - -impl StateRootProvider for HttpProviderState { - /// Returns the state root of the `HashedPostState` on top of the current state. - fn hashed_state_root(&self, hashed_state: &HashedPostState) -> ProviderResult { - unimplemented!("todo"); - } - - /// Returns the state root of the `HashedPostState` on top of the current state with trie - /// updates to be committed to the database. - fn hashed_state_root_with_updates( - &self, - hashed_state: &HashedPostState, - ) -> ProviderResult<(B256, TrieUpdates)> { - unimplemented!("todo"); - } -} - -impl StateProofProvider for HttpProviderState { - /// Get account and storage proofs of target keys in the `HashedPostState` - /// on top of the current state. - fn hashed_proof( - &self, - hashed_state: &HashedPostState, - address: Address, - slots: &[B256], - ) -> ProviderResult { - unimplemented!("todo"); - } -} diff --git a/crates/rbuilder/src/provider/mod.rs b/crates/rbuilder/src/provider/mod.rs index 6fda341f..19c47209 100644 --- a/crates/rbuilder/src/provider/mod.rs +++ b/crates/rbuilder/src/provider/mod.rs @@ -1,8 +1,6 @@ use reth_errors::ProviderResult; -use reth_primitives::{BlockNumber, B256}; -use reth_provider::StateProviderBox; - -mod http_provider; +use reth_primitives::{Block, BlockHash, BlockNumber, Header, B256}; +use reth_provider::{ExecutionOutcome, StateProviderBox}; pub trait StateProviderFactory: Send + Sync { fn history_by_block_number( @@ -10,9 +8,17 @@ pub trait StateProviderFactory: Send + Sync { block_number: BlockNumber, ) -> ProviderResult; + /// Get header by block hash + fn header(&self, block_hash: &BlockHash) -> ProviderResult>; + fn history_by_block_hash(&self, block_hash: B256) -> ProviderResult; fn last_block_number(&self) -> ProviderResult; fn latest(&self) -> ProviderResult; + + fn block_by_number(&self, num: u64) -> ProviderResult>; + + fn state_root(&self, parent_hash: B256, output: &ExecutionOutcome) + -> Result; // TODO: Custom error } diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index 6ec7f0ce..10031f97 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -2,12 +2,15 @@ use crate::{ provider::StateProviderFactory, telemetry::{inc_provider_bad_reopen_counter, inc_provider_reopen_counter}, }; -use reth::providers::{BlockHashReader, ChainSpecProvider, ProviderFactory}; +use reth::providers::{BlockHashReader, ChainSpecProvider, ExecutionOutcome, ProviderFactory}; use reth_chainspec::ChainSpec; use reth_db::database::Database; use reth_errors::{ProviderResult, RethResult}; -use reth_primitives::BlockNumber; -use reth_provider::{providers::StaticFileProvider, StateProviderBox, StaticFileProviderFactory}; +use reth_primitives::{Block, BlockHash, BlockNumber, Header, B256}; +use reth_provider::{ + providers::StaticFileProvider, BlockNumReader, BlockReader, HeaderProvider, StateProviderBox, + StaticFileProviderFactory, +}; use std::{ path::PathBuf, sync::{Arc, Mutex}, @@ -137,23 +140,43 @@ pub fn check_provider_factory_health( impl StateProviderFactory for ProviderFactoryReopener { fn history_by_block_number( &self, - _block_number: BlockNumber, + block_number: BlockNumber, ) -> ProviderResult { - unimplemented!("TODO"); + self.provider_factory_unchecked() + .history_by_block_number(block_number) } fn history_by_block_hash( &self, - _block_hash: revm_primitives::B256, + block_hash: revm_primitives::B256, ) -> ProviderResult { - unimplemented!("TODO"); + self.provider_factory_unchecked() + .history_by_block_hash(block_hash) } fn last_block_number(&self) -> ProviderResult { - unimplemented!("TODO"); + self.provider_factory_unchecked().last_block_number() } fn latest(&self) -> ProviderResult { + self.provider_factory_unchecked().latest() + } + + fn block_by_number(&self, num: u64) -> ProviderResult> { + self.provider_factory_unchecked().block_by_number(num) + } + + /// Get header by block hash + fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + self.provider_factory_unchecked().header(block_hash) + } + + fn state_root( + &self, + _parent_hash: B256, + _output: &ExecutionOutcome, + ) -> Result { + println!("F"); unimplemented!("TODO"); } } From 525881a519dca964dbd5bcb32ffcec5be14b1914 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Fri, 23 Aug 2024 19:40:30 +0100 Subject: [PATCH 07/10] It works --- .../block_output/block_finisher.rs | 4 ++-- .../src/utils/provider_factory_reopen.rs | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/crates/rbuilder/src/live_builder/block_output/block_finisher.rs b/crates/rbuilder/src/live_builder/block_output/block_finisher.rs index a5976b1b..a9ea0c0d 100644 --- a/crates/rbuilder/src/live_builder/block_output/block_finisher.rs +++ b/crates/rbuilder/src/live_builder/block_output/block_finisher.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use tracing::{trace, warn}; +use tracing::trace; use crate::building::builders::{ block_building_helper::{BlockBuildingHelper, BlockBuildingHelperError}, @@ -57,7 +57,7 @@ impl BlockFinisher { impl UnfinishedBlockBuildingSink for BlockFinisher { fn new_block(&self, block: Box) { if let Err(err) = self.finish_and_submit(block) { - warn!(?err, "Error finishing block"); + trace!(?err, "Error finishing block"); } } diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index 10031f97..9ae7ec6d 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -1,8 +1,12 @@ use crate::{ provider::StateProviderFactory, + roothash::{calculate_state_root, RootHashMode}, telemetry::{inc_provider_bad_reopen_counter, inc_provider_reopen_counter}, }; -use reth::providers::{BlockHashReader, ChainSpecProvider, ExecutionOutcome, ProviderFactory}; +use reth::{ + providers::{BlockHashReader, ChainSpecProvider, ExecutionOutcome, ProviderFactory}, + tasks::pool::BlockingTaskPool, +}; use reth_chainspec::ChainSpec; use reth_db::database::Database; use reth_errors::{ProviderResult, RethResult}; @@ -173,10 +177,19 @@ impl StateProviderFactory for ProviderFactoryReo fn state_root( &self, - _parent_hash: B256, - _output: &ExecutionOutcome, + parent_hash: B256, + output: &ExecutionOutcome, ) -> Result { - println!("F"); - unimplemented!("TODO"); + let pool = + BlockingTaskPool::new(BlockingTaskPool::builder().num_threads(10).build().unwrap()); + + calculate_state_root( + self.provider_factory_unchecked(), + parent_hash, + output, + RootHashMode::CorrectRoot, + pool, + ) + .map_err(|_e| eyre::eyre!("Failed to calculate state root")) } } From 61eead0d69586d04d5adb1a06a25d455a65211cc Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Sat, 24 Aug 2024 18:40:35 +0100 Subject: [PATCH 08/10] Remove root_pool from everywhere --- .../rbuilder/src/bin/debug-bench-machine.rs | 1 - crates/rbuilder/src/bin/dummy-builder.rs | 17 ++--- .../builders/block_building_helper.rs | 5 -- crates/rbuilder/src/building/builders/mod.rs | 6 +- .../src/building/builders/ordering_builder.rs | 11 --- crates/rbuilder/src/building/mod.rs | 12 --- .../rbuilder/src/live_builder/base_config.rs | 5 +- crates/rbuilder/src/live_builder/config.rs | 73 ++----------------- .../src/utils/provider_factory_reopen.rs | 26 +++++-- 9 files changed, 37 insertions(+), 119 deletions(-) diff --git a/crates/rbuilder/src/bin/debug-bench-machine.rs b/crates/rbuilder/src/bin/debug-bench-machine.rs index aa84cbef..0b0710a0 100644 --- a/crates/rbuilder/src/bin/debug-bench-machine.rs +++ b/crates/rbuilder/src/bin/debug-bench-machine.rs @@ -124,7 +124,6 @@ async fn main() -> eyre::Result<()> { &ctx, factory.clone(), RootHashMode::IgnoreParentHash, - config.base_config().root_hash_task_pool()?, )?; let finalize_time = finalize_time.elapsed(); diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index afab5dbb..4bcb32ea 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -18,10 +18,9 @@ use rbuilder::{ }, live_builder::{ base_config::{ - DEFAULT_EL_NODE_IPC_PATH, DEFAULT_ERROR_STORAGE_PATH, DEFAULT_INCOMING_BUNDLES_PORT, - DEFAULT_IP, DEFAULT_RETH_DB_PATH, + create_provider_factory, DEFAULT_EL_NODE_IPC_PATH, DEFAULT_ERROR_STORAGE_PATH, + DEFAULT_INCOMING_BUNDLES_PORT, DEFAULT_IP, DEFAULT_RETH_DB_PATH, }, - config::create_provider_factory, order_input::{ OrderInputConfig, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, DEFAULT_RESULTS_CHANNEL_TIMEOUT, DEFAULT_SERVE_MAX_CONNECTIONS, @@ -38,7 +37,6 @@ use rbuilder::{ roothash::RootHashMode, utils::{ProviderFactoryReopener, Signer}, }; -use reth::tasks::pool::BlockingTaskPool; use reth_chainspec::MAINNET; use reth_db::DatabaseEnv; use tokio::{signal::ctrl_c, sync::broadcast}; @@ -93,6 +91,8 @@ async fn main() -> eyre::Result<()> { None, None, chain_spec.clone(), + true, + 1, )?, coinbase_signer: Signer::random(), extra_data: Vec::new(), @@ -156,19 +156,13 @@ impl UnfinishedBlockBuildingSink for TracingBlockSink { struct DummyBuildingAlgorithm { /// Amnount of used orders to build a block orders_to_use: usize, - root_hash_task_pool: BlockingTaskPool, } const ORDER_POLLING_PERIOD: Duration = Duration::from_millis(10); const BUILDER_NAME: &str = "DUMMY"; impl DummyBuildingAlgorithm { pub fn new(orders_to_use: usize) -> Self { - Self { - orders_to_use, - root_hash_task_pool: BlockingTaskPool::new( - BlockingTaskPool::builder().num_threads(1).build().unwrap(), - ), - } + Self { orders_to_use } } fn wait_for_orders( @@ -200,7 +194,6 @@ impl DummyBuildingAlgorithm { ) -> eyre::Result> { let mut block_building_helper = BlockBuildingHelperFromDB::new( provider_factory.clone(), - self.root_hash_task_pool.clone(), RootHashMode::CorrectRoot, ctx.clone(), None, diff --git a/crates/rbuilder/src/building/builders/block_building_helper.rs b/crates/rbuilder/src/building/builders/block_building_helper.rs index 8610ac0d..1d44f831 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper.rs @@ -4,7 +4,6 @@ use std::{ }; use alloy_primitives::U256; -use reth::tasks::pool::BlockingTaskPool; use reth_payload_builder::database::CachedReads; use reth_primitives::format_ether; use time::OffsetDateTime; @@ -91,7 +90,6 @@ pub struct BlockBuildingHelperFromDB { built_block_trace: BuiltBlockTrace, /// Needed to get the initial state and the final root hash calculation. provider_factory: Provider, - root_hash_task_pool: BlockingTaskPool, root_hash_mode: RootHashMode, /// Token to cancel in case of fatal error (if we believe that it's impossible to build for this block). cancel_on_fatal_error: CancellationToken, @@ -130,7 +128,6 @@ impl BlockBuildingHelperFromDB #[allow(clippy::too_many_arguments)] pub fn new( provider_factory: Provider, - root_hash_task_pool: BlockingTaskPool, root_hash_mode: RootHashMode, building_ctx: BlockBuildingContext, cached_reads: Option, @@ -173,7 +170,6 @@ impl BlockBuildingHelperFromDB building_ctx, built_block_trace: BuiltBlockTrace::new(), provider_factory, - root_hash_task_pool, root_hash_mode, cancel_on_fatal_error, }) @@ -325,7 +321,6 @@ impl BlockBuildingHelper &self.building_ctx, self.provider_factory.clone(), self.root_hash_mode, - self.root_hash_task_pool, ) { Ok(finalized_block) => finalized_block, Err(err) => { diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index 4a2f0857..f0c31c1c 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -12,10 +12,7 @@ use crate::{ use ahash::HashSet; use alloy_primitives::{Address, B256}; use block_building_helper::BlockBuildingHelper; -use reth::{ - primitives::{BlobTransactionSidecar, SealedBlock}, - tasks::pool::BlockingTaskPool, -}; +use reth::primitives::{BlobTransactionSidecar, SealedBlock}; use reth_payload_builder::database::CachedReads; use std::sync::Arc; use tokio::sync::{broadcast, broadcast::error::TryRecvError}; @@ -35,7 +32,6 @@ pub struct Block { #[derive(Debug)] pub struct LiveBuilderInput { pub provider_factory: Provider, - pub root_hash_task_pool: BlockingTaskPool, pub ctx: BlockBuildingContext, pub input: broadcast::Receiver, pub sink: Arc, diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index 15aa445d..9ab770f0 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -21,7 +21,6 @@ use alloy_primitives::Address; use tokio_util::sync::CancellationToken; use crate::roothash::RootHashMode; -use reth::tasks::pool::BlockingTaskPool; use reth_payload_builder::database::CachedReads; use serde::Deserialize; use std::time::{Duration, Instant}; @@ -74,7 +73,6 @@ pub fn run_ordering_builder( let mut builder = OrderingBuilderContext::new( input.provider_factory.clone(), - input.root_hash_task_pool, input.builder_name, input.ctx, config.clone(), @@ -142,7 +140,6 @@ pub fn backtest_simulate_block )?; let mut builder = OrderingBuilderContext::new( input.provider_factory.clone(), - BlockingTaskPool::build()?, input.builder_name, input.ctx.clone(), ordering_config, @@ -170,7 +167,6 @@ pub fn backtest_simulate_block #[derive(Debug)] pub struct OrderingBuilderContext { provider_factory: Provider, - root_hash_task_pool: BlockingTaskPool, builder_name: String, ctx: BlockBuildingContext, config: OrderingBuilderConfig, @@ -187,14 +183,12 @@ pub struct OrderingBuilderContext { impl OrderingBuilderContext { pub fn new( provider_factory: Provider, - root_hash_task_pool: BlockingTaskPool, builder_name: String, ctx: BlockBuildingContext, config: OrderingBuilderConfig, ) -> Self { Self { provider_factory, - root_hash_task_pool, builder_name, ctx, config, @@ -251,7 +245,6 @@ impl OrderingBuilderContext OrderingBuilderContext, config: OrderingBuilderConfig, name: String, @@ -344,13 +336,11 @@ pub struct OrderingBuildingAlgorithm { impl OrderingBuildingAlgorithm { pub fn new( - root_hash_task_pool: BlockingTaskPool, sbundle_mergeabe_signers: Vec
, config: OrderingBuilderConfig, name: String, ) -> Self { Self { - root_hash_task_pool, sbundle_mergeabe_signers, config, name, @@ -368,7 +358,6 @@ impl BlockBuildingAlgorithm) { let live_input = LiveBuilderInput { provider_factory: input.provider_factory, - root_hash_task_pool: self.root_hash_task_pool.clone(), ctx: input.ctx.clone(), input: input.input, sink: input.sink, diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index aefb10a0..0873b2b5 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -29,7 +29,6 @@ use reth::{ }, providers::ExecutionOutcome, rpc::types::beacon::events::PayloadAttributesEvent, - tasks::pool::BlockingTaskPool, }; use reth_basic_payload_builder::{commit_withdrawals, WithdrawalsOutcome}; use reth_chainspec::{ChainSpec, EthereumHardforks}; @@ -542,7 +541,6 @@ impl PartialBlock { ctx: &BlockBuildingContext, provider_factory: Provider, _root_hash_mode: RootHashMode, - _root_hash_task_pool: BlockingTaskPool, ) -> eyre::Result { let (withdrawals_root, withdrawals) = { let mut db = state.new_db_ref(); @@ -603,16 +601,6 @@ impl PartialBlock { let state_root = provider_factory.state_root(ctx.attributes.parent, &execution_outcome)?; - /* - let state_root = calculate_state_root( - provider_factory, - ctx.attributes.parent, - &execution_outcome, - root_hash_mode, - root_hash_task_pool, - )?; - */ - // create the block header let transactions_root = proofs::calculate_transaction_root(&self.executed_tx); diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index b728bf1f..35dd70ba 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -227,6 +227,7 @@ impl BaseConfig { self.reth_static_files_path.as_deref(), self.chain_spec()?, false, + self.root_hash_task_pool_threads, ) } @@ -411,6 +412,7 @@ pub fn create_provider_factory( reth_static_files_path: Option<&Path>, chain_spec: Arc, rw: bool, + task_pool_threads: usize, ) -> eyre::Result>> { // shellexpand the reth datadir let reth_datadir = if let Some(reth_datadir) = reth_datadir { @@ -444,7 +446,7 @@ pub fn create_provider_factory( }; let provider_factory_reopener = - ProviderFactoryReopener::new(db, chain_spec, reth_static_files_path)?; + ProviderFactoryReopener::new(db, chain_spec, reth_static_files_path, task_pool_threads)?; if provider_factory_reopener .provider_factory_unchecked() @@ -547,6 +549,7 @@ mod test { reth_static_files_path.as_deref(), Default::default(), true, + 1, ); if *should_succeed { diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index ba35faef..03ca49e4 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -43,20 +43,12 @@ use ethereum_consensus::{ builder::compute_builder_domain, crypto::SecretKey, primitives::Version, state_transition::Context as ContextEth, }; -use eyre::Context; -use reth::tasks::pool::BlockingTaskPool; use reth_chainspec::{Chain, ChainSpec, NamedChain}; use reth_db::DatabaseEnv; use reth_payload_builder::database::CachedReads; -use reth_primitives::StaticFileSegment; -use reth_provider::StaticFileProviderFactory; use serde::Deserialize; use serde_with::{serde_as, OneOrMany}; -use std::{ - path::{Path, PathBuf}, - str::FromStr, - sync::Arc, -}; +use std::{str::FromStr, sync::Arc}; use tracing::info; use url::Url; @@ -303,10 +295,8 @@ impl LiveBuilderConfig for Config { .base_config .create_builder(cancellation_token, sink_factory, payload_event) .await?; - let root_hash_task_pool = self.base_config.root_hash_task_pool()?; let builders = create_builders( self.live_builders()?, - root_hash_task_pool, self.base_config.sbundle_mergeabe_signers(), ); Ok(live_builder.with_builders(builders)) @@ -382,50 +372,6 @@ impl Default for Config { } } -/// Open reth db and DB should be opened once per process but it can be cloned and moved to different threads. -pub fn create_provider_factory( - reth_datadir: Option<&Path>, - reth_db_path: Option<&Path>, - reth_static_files_path: Option<&Path>, - chain_spec: Arc, -) -> eyre::Result>> { - let reth_db_path = match (reth_db_path, reth_datadir) { - (Some(reth_db_path), _) => PathBuf::from(reth_db_path), - (None, Some(reth_datadir)) => reth_datadir.join("db"), - (None, None) => eyre::bail!("Either reth_db_path or reth_datadir must be provided"), - }; - - let db = open_reth_db(&reth_db_path)?; - - let reth_static_files_path = match (reth_static_files_path, reth_datadir) { - (Some(reth_static_files_path), _) => PathBuf::from(reth_static_files_path), - (None, Some(reth_datadir)) => reth_datadir.join("static_files"), - (None, None) => { - eyre::bail!("Either reth_static_files_path or reth_datadir must be provided") - } - }; - - let provider_factory_reopener = - ProviderFactoryReopener::new(db, chain_spec, reth_static_files_path)?; - - if provider_factory_reopener - .provider_factory_unchecked() - .static_file_provider() - .get_highest_static_file_block(StaticFileSegment::Headers) - .is_none() - { - eyre::bail!("No headers in static files. Check your static files path configuration."); - } - - Ok(provider_factory_reopener) -} - -fn open_reth_db(reth_db_path: &Path) -> eyre::Result> { - Ok(Arc::new( - reth_db::open_db_read_only(reth_db_path, Default::default()).context("DB open error")?, - )) -} - pub fn coinbase_signer_from_secret_key(secret_key: &str) -> eyre::Result { let secret_key = B256::from_str(secret_key)?; Ok(Signer::try_from_secret(secret_key)?) @@ -433,29 +379,22 @@ pub fn coinbase_signer_from_secret_key(secret_key: &str) -> eyre::Result fn create_builders( configs: Vec, - root_hash_task_pool: BlockingTaskPool, sbundle_mergeabe_signers: Vec
, ) -> Vec>> { configs .into_iter() - .map(|cfg| create_builder(cfg, &root_hash_task_pool, &sbundle_mergeabe_signers)) + .map(|cfg| create_builder(cfg, &sbundle_mergeabe_signers)) .collect() } fn create_builder( cfg: BuilderConfig, - root_hash_task_pool: &BlockingTaskPool, sbundle_mergeabe_signers: &[Address], ) -> Arc> { match cfg.builder { - SpecificBuilderConfig::OrderingBuilder(order_cfg) => { - Arc::new(OrderingBuildingAlgorithm::new( - root_hash_task_pool.clone(), - sbundle_mergeabe_signers.to_vec(), - order_cfg, - cfg.name, - )) - } + SpecificBuilderConfig::OrderingBuilder(order_cfg) => Arc::new( + OrderingBuildingAlgorithm::new(sbundle_mergeabe_signers.to_vec(), order_cfg, cfg.name), + ), } } @@ -501,7 +440,7 @@ mod test { use super::*; use crate::live_builder::base_config::load_config_toml_and_env; use alloy_primitives::{address, fixed_bytes}; - use std::env; + use std::{env, path::PathBuf}; use url::Url; #[test] diff --git a/crates/rbuilder/src/utils/provider_factory_reopen.rs b/crates/rbuilder/src/utils/provider_factory_reopen.rs index 9ae7ec6d..3e4ca237 100644 --- a/crates/rbuilder/src/utils/provider_factory_reopen.rs +++ b/crates/rbuilder/src/utils/provider_factory_reopen.rs @@ -32,21 +32,35 @@ pub struct ProviderFactoryReopener { static_files_path: PathBuf, /// Patch to disable checking on test mode. Is ugly but ProviderFactoryReopener should die shortly (5/24/2024). testing_mode: bool, + root_hash_task_pool: BlockingTaskPool, } impl ProviderFactoryReopener { - pub fn new(db: DB, chain_spec: Arc, static_files_path: PathBuf) -> RethResult { + pub fn new( + db: DB, + chain_spec: Arc, + static_files_path: PathBuf, + task_pool_threads: usize, + ) -> RethResult { let provider_factory = ProviderFactory::new( db, chain_spec.clone(), StaticFileProvider::read_only(static_files_path.as_path()).unwrap(), ); + let pool = BlockingTaskPool::new( + BlockingTaskPool::builder() + .num_threads(task_pool_threads) + .build() + .unwrap(), + ); + Ok(Self { provider_factory: Arc::new(Mutex::new(provider_factory)), chain_spec, static_files_path, testing_mode: false, + root_hash_task_pool: pool, }) } @@ -55,11 +69,16 @@ impl ProviderFactoryReopener { ) -> RethResult { let chain_spec = provider_factory.chain_spec(); let static_files_path = provider_factory.static_file_provider().path().to_path_buf(); + + let pool = + BlockingTaskPool::new(BlockingTaskPool::builder().num_threads(10).build().unwrap()); + Ok(Self { provider_factory: Arc::new(Mutex::new(provider_factory)), chain_spec, static_files_path, testing_mode: true, + root_hash_task_pool: pool, }) } @@ -180,15 +199,12 @@ impl StateProviderFactory for ProviderFactoryReo parent_hash: B256, output: &ExecutionOutcome, ) -> Result { - let pool = - BlockingTaskPool::new(BlockingTaskPool::builder().num_threads(10).build().unwrap()); - calculate_state_root( self.provider_factory_unchecked(), parent_hash, output, RootHashMode::CorrectRoot, - pool, + self.root_hash_task_pool.clone(), ) .map_err(|_e| eyre::eyre!("Failed to calculate state root")) } From 6105901d9c4e614dca0c66ab9638cf34199431db Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 27 Aug 2024 08:35:47 +0100 Subject: [PATCH 09/10] Remove stuff --- crates/rbuilder/src/backtest/redistribute/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/rbuilder/src/backtest/redistribute/mod.rs b/crates/rbuilder/src/backtest/redistribute/mod.rs index 7a740d06..7df92c50 100644 --- a/crates/rbuilder/src/backtest/redistribute/mod.rs +++ b/crates/rbuilder/src/backtest/redistribute/mod.rs @@ -25,8 +25,6 @@ pub use cli::run_backtest_redistribute; use jsonrpsee::core::Serialize; use rayon::prelude::*; use reth_chainspec::ChainSpec; -use reth_db::DatabaseEnv; -use reth_provider::ProviderFactory; use std::{ cmp::{max, min}, sync::Arc, From b7b09e1644594b2f2a96e3d101d9f60c92f84609 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 28 Aug 2024 16:45:38 +0100 Subject: [PATCH 10/10] Cleanup --- .../src/live_builder/order_input/clean_orderpool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs index a311dded..d14eba06 100644 --- a/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/clean_orderpool.rs @@ -4,7 +4,7 @@ use crate::{ provider::StateProviderFactory, telemetry::{set_current_block, set_ordepool_count}, }; -use alloy_provider::{IpcConnect, Provider, ProviderBuilder}; +use alloy_provider::{IpcConnect, Provider as JsonRPCProvider, ProviderBuilder}; use futures::StreamExt; use std::{ pin::pin, @@ -17,9 +17,9 @@ use tracing::{debug, error, info}; /// Performs maintenance operations on every new header by calling OrderPool::head_updated. /// Also calls some functions to generate metrics. -pub async fn spawn_clean_orderpool_job( +pub async fn spawn_clean_orderpool_job( config: OrderInputConfig, - provider_factory: SProvider, + provider_factory: Provider, orderpool: Arc>, global_cancellation: CancellationToken, ) -> eyre::Result> {