diff --git a/crates/rbuilder/src/bin/debug-order-sim.rs b/crates/rbuilder/src/bin/debug-order-sim.rs index 812e6bf5..ecfe6405 100644 --- a/crates/rbuilder/src/bin/debug-order-sim.rs +++ b/crates/rbuilder/src/bin/debug-order-sim.rs @@ -133,8 +133,9 @@ pub async fn main() -> eyre::Result<()> { ); let block_cancel = CancellationToken::new(); - let mut sim_results = - sim_pool.spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone()); + let mut sim_results = sim_pool + .spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone()) + .subscribe(); loop { tokio::select! { new_slot = slots.recv() => { @@ -147,8 +148,8 @@ pub async fn main() -> eyre::Result<()> { break 'slots; } }, - sim_command = sim_results.orders.recv() => { - if let Some(sim_command) = sim_command { + sim_command = sim_results.recv() => { + if let Ok(sim_command) = sim_command { match sim_command{ SimulatedOrderCommand::Simulation(sim) => { orders_last_slot += 1; diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 71cd5f6c..0964b466 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -98,7 +98,7 @@ async fn main() -> eyre::Result<()> { global_cancellation: cancel.clone(), extra_rpc: RpcModule::new(()), sink_factory: Box::new(TraceBlockSinkFactory {}), - builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], + builder: Arc::new(DummyBuildingAlgorithm::new(10)), }; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 52711831..7437fe44 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -2,7 +2,7 @@ //! use crate::{ building::builders::UnfinishedBlockBuildingSinkFactory, - live_builder::{order_input::OrderInputConfig, LiveBuilder}, + live_builder::{order_input::OrderInputConfig, LiveBuilder, NullBlockBuildingAlgorithm}, telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig}, utils::{http_provider, BoxedProvider, ProviderFactoryReopener, Signer}, }; @@ -193,7 +193,7 @@ impl BaseConfig { extra_rpc: RpcModule::new(()), sink_factory, - builders: Vec::new(), + builder: Arc::new(NullBlockBuildingAlgorithm {}), }) } diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index 222e589e..7ead0138 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -1,139 +1,57 @@ -use std::{sync::Arc, time::Duration}; - -use crate::{ - building::{ - builders::{ - BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory, - }, - BlockBuildingContext, - }, - live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults}, - utils::ProviderFactoryReopener, -}; +use crate::building::builders::{BlockBuildingAlgorithm, BlockBuildingAlgorithmInput}; use reth_db::database::Database; -use tokio::sync::{broadcast, mpsc}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, trace}; - -use super::{ - order_input::{ - self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, - }, - payload_events, - simulation::OrderSimulationPool, -}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tracing::{debug, trace}; #[derive(Debug)] pub struct BlockBuildingPool { - provider_factory: ProviderFactoryReopener, builders: Vec>>, - sink_factory: Box, - orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, } impl BlockBuildingPool { - pub fn new( - provider_factory: ProviderFactoryReopener, - builders: Vec>>, - sink_factory: Box, - orderpool_subscriber: order_input::OrderPoolSubscriber, - order_simulation_pool: OrderSimulationPool, - ) -> Self { - BlockBuildingPool { - provider_factory, - builders, - sink_factory, - orderpool_subscriber, - order_simulation_pool, - } + pub fn new(builders: Vec>>) -> Self { + BlockBuildingPool { builders } } +} - /// Connects OrdersForBlock->OrderReplacementManager->Simulations and calls start_building_job - pub fn start_block_building( - &mut self, - payload: payload_events::MevBoostSlotData, - block_ctx: BlockBuildingContext, - global_cancellation: CancellationToken, - max_time_to_build: Duration, - ) { - let block_cancellation = global_cancellation.child_token(); - - let cancel = block_cancellation.clone(); - tokio::spawn(async move { - tokio::time::sleep(max_time_to_build).await; - cancel.cancel(); - }); - - let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); - // add OrderReplacementManager to manage replacements and cancellations - let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); - // sink removal is automatic via OrderSink::is_alive false - let _block_sub = self.orderpool_subscriber.add_sink( - block_ctx.block_env.number.to(), - Box::new(order_replacement_manager), - ); - - let simulations_for_block = self.order_simulation_pool.spawn_simulation_job( - block_ctx.clone(), - orders_for_block, - block_cancellation.clone(), - ); - self.start_building_job( - block_ctx, - payload, - simulations_for_block, - block_cancellation, - ); +impl BlockBuildingAlgorithm + for BlockBuildingPool +{ + fn name(&self) -> String { + "BlockBuildingPool".to_string() } - /// Per each BlockBuildingAlgorithm creates BlockBuildingAlgorithmInput and Sinks and spawn a task to run it - fn start_building_job( - &mut self, - ctx: BlockBuildingContext, - slot_data: MevBoostSlotData, - input: SlotOrderSimResults, - cancel: CancellationToken, - ) { - let builder_sink = self.sink_factory.create_sink(slot_data, cancel.clone()); + fn build_blocks(&self, input: BlockBuildingAlgorithmInput) { let (broadcast_input, _) = broadcast::channel(10_000); - - let block_number = ctx.block_env.number.to::(); - let provider_factory = match self - .provider_factory - .check_consistency_and_reopen_if_needed(block_number) - { - Ok(provider_factory) => provider_factory, - Err(err) => { - error!(?err, "Error while reopening provider factory"); - return; - } - }; + let block_number = input.ctx.block_env.number.to::(); for builder in self.builders.iter() { let builder_name = builder.name(); debug!(block = block_number, builder_name, "Spawning builder job"); - let input = BlockBuildingAlgorithmInput:: { - provider_factory: provider_factory.clone(), - ctx: ctx.clone(), + + let builder = builder.clone(); + let input = BlockBuildingAlgorithmInput { + provider_factory: input.provider_factory.clone(), + ctx: input.ctx.clone(), input: broadcast_input.subscribe(), - sink: builder_sink.clone(), - cancel: cancel.clone(), + sink: input.sink.clone(), + cancel: input.cancel.clone(), }; - let builder = builder.clone(); + tokio::task::spawn_blocking(move || { builder.build_blocks(input); debug!(block = block_number, builder_name, "Stopped builder job"); }); } - tokio::spawn(multiplex_job(input.orders, broadcast_input)); + tokio::spawn(multiplex_job(input.input, broadcast_input)); } } -async fn multiplex_job(mut input: mpsc::Receiver, sender: broadcast::Sender) { +async fn multiplex_job(mut input: broadcast::Receiver, sender: broadcast::Sender) { // we don't worry about waiting for input forever because it will be closed by producer job - while let Some(input) = input.recv().await { + while let Ok(input) = input.recv().await { // we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers if sender.send(input).is_err() { return; diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index c053c71e..0c9b985d 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -27,6 +27,7 @@ use crate::{ }, cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, + BlockBuildingPool, }, mev_boost::BLSBlockSigner, primitives::mev_boost::{MevBoostRelay, RelayConfig}, @@ -306,7 +307,8 @@ impl LiveBuilderConfig for Config { root_hash_task_pool, self.base_config.sbundle_mergeabe_signers(), ); - Ok(live_builder.with_builders(builders)) + let builder = BlockBuildingPool::new(builders); + Ok(live_builder.with_builder(Arc::new(builder))) } fn version_for_telemetry(&self) -> crate::utils::build_info::Version { diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index 2f3fae4c..aabfc3f1 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -10,11 +10,17 @@ mod watchdog; use crate::{ building::{ - builders::{BlockBuildingAlgorithm, UnfinishedBlockBuildingSinkFactory}, + builders::{ + BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory, + }, BlockBuildingContext, }, live_builder::{ - order_input::{start_orderpool_jobs, OrderInputConfig}, + building::BlockBuildingPool, + order_input::{ + order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, + start_orderpool_jobs, OrderInputConfig, OrderPoolSubscriber, + }, simulation::OrderSimulationPool, watchdog::spawn_watchdog_thread, }, @@ -23,7 +29,6 @@ use crate::{ }; use ahash::HashSet; use alloy_primitives::{Address, B256}; -use building::BlockBuildingPool; use eyre::Context; use jsonrpsee::RpcModule; use payload_events::MevBoostSlotData; @@ -49,7 +54,7 @@ const GET_BLOCK_HEADER_PERIOD: time::Duration = time::Duration::milliseconds(250 /// Trait used to trigger a new block building process in the slot. pub trait SlotSource { - fn recv_slot_channel(self) -> mpsc::UnboundedReceiver; + fn recv_slot_channel(&self) -> mpsc::UnboundedReceiver; } /// Main builder struct. @@ -74,7 +79,7 @@ pub struct LiveBuilder { pub global_cancellation: CancellationToken, pub sink_factory: Box, - pub builders: Vec>>, + pub builder: Arc>, // doing the Option because there is a fuunction that creates the live_builder without the builder. pub extra_rpc: RpcModule<()>, } @@ -85,29 +90,32 @@ impl Self { extra_rpc, ..self } } - pub fn with_builders(self, builders: Vec>>) -> Self { - Self { builders, ..self } + pub fn with_builder(self, builder: Arc>) -> Self { + Self { builder, ..self } } - pub async fn run(self) -> eyre::Result<()> { + pub async fn run(mut self) -> eyre::Result<()> { info!("Builder block list size: {}", self.blocklist.len(),); info!( "Builder coinbase address: {:?}", self.coinbase_signer.address ); - spawn_error_storage_writer(self.error_storage_path, self.global_cancellation.clone()) - .await - .with_context(|| "Error spawning error storage writer")?; + spawn_error_storage_writer( + self.error_storage_path.clone(), + self.global_cancellation.clone(), + ) + .await + .with_context(|| "Error spawning error storage writer")?; let mut inner_jobs_handles = Vec::new(); let mut payload_events_channel = self.blocks_source.recv_slot_channel(); let orderpool_subscriber = { let (handle, sub) = start_orderpool_jobs( - self.order_input_config, + self.order_input_config.clone(), self.provider_factory.clone(), - self.extra_rpc, + self.extra_rpc.clone(), self.global_cancellation.clone(), ) .await?; @@ -123,15 +131,8 @@ impl ) }; - let mut builder_pool = BlockBuildingPool::new( - self.provider_factory.clone(), - self.builders, - self.sink_factory, - orderpool_subscriber, - order_simulation_pool, - ); - let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?; + // let mut sink_factory = self.sink_factory; while let Some(payload) = payload_events_channel.recv().await { if self.blocklist.contains(&payload.fee_recipient()) { @@ -215,11 +216,14 @@ impl None, ); - builder_pool.start_block_building( - payload, + let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); + + self.build_block( + max_time_to_build, block_ctx, - self.global_cancellation.clone(), - time_until_slot_end.try_into().unwrap_or_default(), + payload, + &orderpool_subscriber, + &order_simulation_pool, ); watchdog_sender.try_send(()).unwrap_or_default(); @@ -235,6 +239,53 @@ impl } Ok(()) } + + fn build_block( + &mut self, + max_time_to_build: Duration, + block_ctx: BlockBuildingContext, + payload: MevBoostSlotData, + orderpool_subscriber: &OrderPoolSubscriber, + order_simulation_pool: &OrderSimulationPool, + ) { + // let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default(); + let block_cancellation = self.global_cancellation.clone().child_token(); + + let cancel = block_cancellation.clone(); + tokio::spawn(async move { + tokio::time::sleep(max_time_to_build).await; + cancel.cancel(); + }); + + let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); + // add OrderReplacementManager to manage replacements and cancellations + let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); + // sink removal is automatic via OrderSink::is_alive false + let _block_sub = orderpool_subscriber.add_sink( + block_ctx.block_env.number.to(), + Box::new(order_replacement_manager), + ); + + let simulations_for_block = order_simulation_pool.spawn_simulation_job( + block_ctx.clone(), + orders_for_block, + block_cancellation.clone(), + ); + + let builder_sink = self + .sink_factory + .create_sink(payload, block_cancellation.clone()); + + let input = BlockBuildingAlgorithmInput:: { + provider_factory: self.provider_factory.provider_factory_unchecked(), + ctx: block_ctx, + sink: builder_sink, + input: simulations_for_block.subscribe(), + cancel: block_cancellation, + }; + + self.builder.build_blocks(input); + } } /// May fail if we wait too much (see [BLOCK_HEADER_DEAD_LINE_DELTA]) @@ -260,3 +311,16 @@ async fn wait_for_block_header( } Err(eyre::eyre!("Block header not found")) } + +#[derive(Debug)] +pub struct NullBlockBuildingAlgorithm {} + +impl BlockBuildingAlgorithm + for NullBlockBuildingAlgorithm +{ + fn name(&self) -> String { + "NullBlockBuildingAlgorithm".to_string() + } + + fn build_blocks(&self, _input: BlockBuildingAlgorithmInput) {} +} diff --git a/crates/rbuilder/src/live_builder/payload_events/mod.rs b/crates/rbuilder/src/live_builder/payload_events/mod.rs index 2369be45..0701a9fb 100644 --- a/crates/rbuilder/src/live_builder/payload_events/mod.rs +++ b/crates/rbuilder/src/live_builder/payload_events/mod.rs @@ -110,16 +110,20 @@ impl MevBoostSlotDataGenerator { /// When MEV-boost is used, we tell the CL “--always-build-payload” (we are building blocks for ANY validator now!). The CL does /// it, but even with the event being created for every slot, the fee_recipient we get from MEV-Boost might be different so we should always replace it. /// Note that with MEV-boost the validator may change the fee_recipient when registering to the Relays. - pub fn spawn(self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { + pub fn spawn(&self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { let relays = RelaysForSlotData::new(&self.relays); let (send, receive) = mpsc::unbounded_channel(); + + let cls = self.cls.clone(); + let global_cancellation = self.global_cancellation.clone(); + let blocklist = self.blocklist.clone(); let handle = tokio::spawn(async move { let mut source = PayloadSourceMuxer::new( - &self.cls, + &cls, NEW_PAYLOAD_RECV_TIMEOUT, CONSENSUS_CLIENT_RECONNECT_WAIT, - self.global_cancellation.clone(), + global_cancellation.clone(), ); info!("MevBoostSlotDataGenerator: started"); @@ -127,7 +131,7 @@ impl MevBoostSlotDataGenerator { let mut recently_sent_data = VecDeque::with_capacity(RECENTLY_SENT_EVENTS_BUFF); while let Some(event) = source.recv().await { - if self.global_cancellation.is_cancelled() { + if global_cancellation.is_cancelled() { return; } @@ -151,9 +155,7 @@ impl MevBoostSlotDataGenerator { slot_data, }; - if let Err(err) = - check_slot_data_for_blocklist(&mev_boost_slot_data, &self.blocklist) - { + if let Err(err) = check_slot_data_for_blocklist(&mev_boost_slot_data, &blocklist) { warn!("Slot data failed blocklist check: {:?}", err); continue; } @@ -174,7 +176,7 @@ impl MevBoostSlotDataGenerator { } } // cancelling here because its a critical job - self.global_cancellation.cancel(); + global_cancellation.cancel(); source.join().await; info!("MevBoostSlotDataGenerator: finished"); @@ -185,7 +187,7 @@ impl MevBoostSlotDataGenerator { } impl SlotSource for MevBoostSlotDataGenerator { - fn recv_slot_channel(self) -> mpsc::UnboundedReceiver { + fn recv_slot_channel(&self) -> mpsc::UnboundedReceiver { let (_handle, chan) = self.spawn(); chan } diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 70a22904..823cc012 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -14,13 +14,22 @@ use ahash::HashMap; use reth_db::database::Database; use simulation_job::SimulationJob; use std::sync::{Arc, Mutex}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; use tokio_util::sync::CancellationToken; use tracing::{info_span, Instrument}; #[derive(Debug)] pub struct SlotOrderSimResults { - pub orders: mpsc::Receiver, + orders: broadcast::Sender, +} + +impl SlotOrderSimResults { + pub fn subscribe(&self) -> broadcast::Receiver { + self.orders.subscribe() + } } type BlockContextId = u64; @@ -105,7 +114,7 @@ impl OrderSimulationPool { input: OrdersForBlock, block_cancellation: CancellationToken, ) -> SlotOrderSimResults { - let (slot_sim_results_sender, slot_sim_results_receiver) = mpsc::channel(10_000); + let (slot_sim_results_sender, _) = broadcast::channel(10_000); let provider = self.provider_factory.provider_factory_unchecked(); @@ -113,6 +122,7 @@ impl OrderSimulationPool { let block_context: BlockContextId = gen_uid(); let span = info_span!("sim_ctx", block = ctx.block_env.number.to::(), parent = ?ctx.attributes.parent); + let task_slot_sim_sender = slot_sim_results_sender.clone(); let handle = tokio::spawn( async move { let sim_tree = SimTree::new(provider, ctx.attributes.parent); @@ -133,7 +143,7 @@ impl OrderSimulationPool { new_order_sub, sim_req_sender, sim_results_receiver, - slot_sim_results_sender, + task_slot_sim_sender, sim_tree, ); @@ -155,7 +165,7 @@ impl OrderSimulationPool { } SlotOrderSimResults { - orders: slot_sim_results_receiver, + orders: slot_sim_results_sender, } } } @@ -187,7 +197,7 @@ mod tests { new_order_sub: order_receiver, }; - let mut sim_results = sim_pool.spawn_simulation_job( + let sim_results = sim_pool.spawn_simulation_job( test_context.block_building_context().clone(), orders_for_block, cancel.clone(), @@ -205,7 +215,7 @@ mod tests { // We expect to receive the simulation giving a profit of coinbase_profit since that's what we sent directly to coinbase. // and we are not paying any priority fee - if let Some(command) = sim_results.orders.recv().await { + if let Ok(command) = sim_results.subscribe().recv().await { match command { SimulatedOrderCommand::Simulation(sim_order) => { assert_eq!( diff --git a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs index c9e6884a..81643933 100644 --- a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs +++ b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs @@ -8,7 +8,7 @@ use crate::{ use ahash::HashSet; use alloy_primitives::utils::format_ether; use reth_db::database::Database; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; @@ -34,7 +34,7 @@ pub struct SimulationJob { /// Here we receive the results we asked to sim_req_sender sim_results_receiver: mpsc::Receiver, /// Output of the simulations - slot_sim_results_sender: mpsc::Sender, + slot_sim_results_sender: broadcast::Sender, sim_tree: SimTree, orders_received: OrderCounter, @@ -51,7 +51,7 @@ impl SimulationJob { new_order_sub: mpsc::UnboundedReceiver, sim_req_sender: flume::Sender, sim_results_receiver: mpsc::Receiver, - slot_sim_results_sender: mpsc::Sender, + slot_sim_results_sender: broadcast::Sender, sim_tree: SimTree, ) -> Self { Self { @@ -176,7 +176,6 @@ impl SimulationJob { .send(SimulatedOrderCommand::Simulation( sim_result.simulated_order.clone(), )) - .await .is_err() { return false; //receiver closed :( @@ -199,7 +198,6 @@ impl SimulationJob { async fn send_cancel(&mut self, id: &OrderId) -> bool { self.slot_sim_results_sender .send(SimulatedOrderCommand::Cancellation(*id)) - .await .is_ok() }