Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the block builder pool a block building algorithm #141

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn main() -> eyre::Result<()> {
global_cancellation: cancel.clone(),
extra_rpc: RpcModule::new(()),
sink_factory: Box::new(TraceBlockSinkFactory {}),
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
builder: Some(Arc::new(DummyBuildingAlgorithm::new(10))),
};

let ctrlc = tokio::spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl BaseConfig {

extra_rpc: RpcModule::new(()),
sink_factory,
builders: Vec::new(),
builder: None,
})
}

Expand Down
132 changes: 25 additions & 107 deletions crates/rbuilder/src/live_builder/building/mod.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,57 @@
use std::{sync::Arc, time::Duration};

use crate::{
building::{
builders::{
BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory,
},
BlockBuildingContext,
},
live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults},
utils::ProviderFactoryReopener,
};
use crate::building::builders::{BlockBuildingAlgorithm, BlockBuildingAlgorithmInput};
use reth_db::database::Database;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace};

use super::{
order_input::{
self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock,
},
payload_events,
simulation::OrderSimulationPool,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace};

#[derive(Debug)]
pub struct BlockBuildingPool<DB> {
provider_factory: ProviderFactoryReopener<DB>,
builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>,
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
}

impl<DB: Database + Clone + 'static> BlockBuildingPool<DB> {
pub fn new(
provider_factory: ProviderFactoryReopener<DB>,
builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>,
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
) -> Self {
BlockBuildingPool {
provider_factory,
builders,
sink_factory,
orderpool_subscriber,
order_simulation_pool,
}
pub fn new(builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>) -> Self {
BlockBuildingPool { builders }
}
}

/// Connects OrdersForBlock->OrderReplacementManager->Simulations and calls start_building_job
pub fn start_block_building(
&mut self,
payload: payload_events::MevBoostSlotData,
block_ctx: BlockBuildingContext,
global_cancellation: CancellationToken,
max_time_to_build: Duration,
) {
let block_cancellation = global_cancellation.child_token();

let cancel = block_cancellation.clone();
tokio::spawn(async move {
tokio::time::sleep(max_time_to_build).await;
cancel.cancel();
});

let (orders_for_block, sink) = OrdersForBlock::new_with_sink();
// add OrderReplacementManager to manage replacements and cancellations
let order_replacement_manager = OrderReplacementManager::new(Box::new(sink));
// sink removal is automatic via OrderSink::is_alive false
let _block_sub = self.orderpool_subscriber.add_sink(
block_ctx.block_env.number.to(),
Box::new(order_replacement_manager),
);

let simulations_for_block = self.order_simulation_pool.spawn_simulation_job(
block_ctx.clone(),
orders_for_block,
block_cancellation.clone(),
);
self.start_building_job(
block_ctx,
payload,
simulations_for_block,
block_cancellation,
);
impl<DB: Database + std::fmt::Debug + Clone + 'static> BlockBuildingAlgorithm<DB>
for BlockBuildingPool<DB>
{
fn name(&self) -> String {
"BlockBuildingPool".to_string()
}

/// Per each BlockBuildingAlgorithm creates BlockBuildingAlgorithmInput and Sinks and spawn a task to run it
fn start_building_job(
&mut self,
ctx: BlockBuildingContext,
slot_data: MevBoostSlotData,
input: SlotOrderSimResults,
cancel: CancellationToken,
) {
let builder_sink = self.sink_factory.create_sink(slot_data, cancel.clone());
fn build_blocks(&self, input: BlockBuildingAlgorithmInput<DB>) {
let (broadcast_input, _) = broadcast::channel(10_000);

let block_number = ctx.block_env.number.to::<u64>();
let provider_factory = match self
.provider_factory
.check_consistency_and_reopen_if_needed(block_number)
{
Ok(provider_factory) => provider_factory,
Err(err) => {
error!(?err, "Error while reopening provider factory");
return;
}
};
let block_number = input.ctx.block_env.number.to::<u64>();

for builder in self.builders.iter() {
let builder_name = builder.name();
debug!(block = block_number, builder_name, "Spawning builder job");
let input = BlockBuildingAlgorithmInput::<DB> {
provider_factory: provider_factory.clone(),
ctx: ctx.clone(),

let builder = builder.clone();
let input = BlockBuildingAlgorithmInput {
provider_factory: input.provider_factory.clone(),
ctx: input.ctx.clone(),
input: broadcast_input.subscribe(),
sink: builder_sink.clone(),
cancel: cancel.clone(),
sink: input.sink.clone(),
cancel: input.cancel.clone(),
};
let builder = builder.clone();

tokio::task::spawn_blocking(move || {
builder.build_blocks(input);
debug!(block = block_number, builder_name, "Stopped builder job");
});
}

tokio::spawn(multiplex_job(input.orders, broadcast_input));
tokio::spawn(multiplex_job(input.input, broadcast_input));
}
}

async fn multiplex_job<T>(mut input: mpsc::Receiver<T>, sender: broadcast::Sender<T>) {
async fn multiplex_job<T: Clone>(mut input: broadcast::Receiver<T>, sender: broadcast::Sender<T>) {
// we don't worry about waiting for input forever because it will be closed by producer job
while let Some(input) = input.recv().await {
while let Ok(input) = input.recv().await {
// we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers
if sender.send(input).is_err() {
return;
Expand Down
4 changes: 3 additions & 1 deletion crates/rbuilder/src/live_builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
},
cli::LiveBuilderConfig,
payload_events::MevBoostSlotDataGenerator,
BlockBuildingPool,
},
mev_boost::BLSBlockSigner,
primitives::mev_boost::{MevBoostRelay, RelayConfig},
Expand Down Expand Up @@ -306,7 +307,8 @@ impl LiveBuilderConfig for Config {
root_hash_task_pool,
self.base_config.sbundle_mergeabe_signers(),
);
Ok(live_builder.with_builders(builders))
let builder = BlockBuildingPool::new(builders);
Ok(live_builder.with_builder(Arc::new(builder)))
}

fn version_for_telemetry(&self) -> crate::utils::build_info::Version {
Expand Down
90 changes: 76 additions & 14 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ mod watchdog;

use crate::{
building::{
builders::{BlockBuildingAlgorithm, UnfinishedBlockBuildingSinkFactory},
builders::{
BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory,
},
BlockBuildingContext,
},
live_builder::{
order_input::{start_orderpool_jobs, OrderInputConfig},
building::BlockBuildingPool,
order_input::{
order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock,
start_orderpool_jobs, OrderInputConfig,
},
simulation::OrderSimulationPool,
watchdog::spawn_watchdog_thread,
},
Expand All @@ -23,7 +29,6 @@ use crate::{
};
use ahash::HashSet;
use alloy_primitives::{Address, B256};
use building::BlockBuildingPool;
use eyre::Context;
use jsonrpsee::RpcModule;
use payload_events::MevBoostSlotData;
Expand All @@ -35,9 +40,12 @@ use reth_chainspec::ChainSpec;
use reth_db::database::Database;
use std::{cmp::min, path::PathBuf, sync::Arc, time::Duration};
use time::OffsetDateTime;
use tokio::{sync::mpsc, task::spawn_blocking};
use tokio::{
sync::{broadcast, mpsc},
task::spawn_blocking,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

/// Time the proposer have to propose a block from the beginning of the slot (https://www.paradigm.xyz/2023/04/mev-boost-ethereum-consensus Slot anatomy)
const SLOT_PROPOSAL_DURATION: std::time::Duration = Duration::from_secs(4);
Expand Down Expand Up @@ -74,7 +82,7 @@ pub struct LiveBuilder<DB, BlocksSourceType: SlotSource> {
pub global_cancellation: CancellationToken,

pub sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
pub builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>,
pub builder: Option<Arc<dyn BlockBuildingAlgorithm<DB>>>, // doing the Option because there is a fuunction that creates the live_builder without the builder.
ferranbt marked this conversation as resolved.
Show resolved Hide resolved
pub extra_rpc: RpcModule<()>,
}

Expand All @@ -85,8 +93,11 @@ impl<DB: Database + Clone + 'static, BuilderSourceType: SlotSource>
Self { extra_rpc, ..self }
}

pub fn with_builders(self, builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>) -> Self {
Self { builders, ..self }
pub fn with_builder(self, builder: Arc<dyn BlockBuildingAlgorithm<DB>>) -> Self {
Self {
builder: Some(builder),
..self
}
}

pub async fn run(self) -> eyre::Result<()> {
Expand Down Expand Up @@ -123,15 +134,18 @@ impl<DB: Database + Clone + 'static, BuilderSourceType: SlotSource>
)
};

/*
let mut builder_pool = BlockBuildingPool::new(
self.provider_factory.clone(),
self.builders,
self.sink_factory,
orderpool_subscriber,
order_simulation_pool,
);
*/
ferranbt marked this conversation as resolved.
Show resolved Hide resolved

let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?;
let mut sink_factory = self.sink_factory;

while let Some(payload) = payload_events_channel.recv().await {
if self.blocklist.contains(&payload.fee_recipient()) {
Expand Down Expand Up @@ -215,12 +229,49 @@ impl<DB: Database + Clone + 'static, BuilderSourceType: SlotSource>
None,
);

builder_pool.start_block_building(
payload,
block_ctx,
self.global_cancellation.clone(),
time_until_slot_end.try_into().unwrap_or_default(),
);
// This was done before in block building pool
{
let max_time_to_build = time_until_slot_end.try_into().unwrap_or_default();
let block_cancellation = self.global_cancellation.clone().child_token();

let cancel = block_cancellation.clone();
tokio::spawn(async move {
tokio::time::sleep(max_time_to_build).await;
cancel.cancel();
});

let (orders_for_block, sink) = OrdersForBlock::new_with_sink();
// add OrderReplacementManager to manage replacements and cancellations
let order_replacement_manager = OrderReplacementManager::new(Box::new(sink));
// sink removal is automatic via OrderSink::is_alive false
let _block_sub = orderpool_subscriber.add_sink(
block_ctx.block_env.number.to(),
Box::new(order_replacement_manager),
);

let simulations_for_block = order_simulation_pool.spawn_simulation_job(
block_ctx.clone(),
orders_for_block,
block_cancellation.clone(),
);

let (broadcast_input, _) = broadcast::channel(10_000);
let builder_sink = sink_factory.create_sink(payload, block_cancellation.clone());

let input = BlockBuildingAlgorithmInput::<DB> {
provider_factory: self.provider_factory.provider_factory_unchecked(),
ctx: block_ctx,
sink: builder_sink,
input: broadcast_input.subscribe(),
cancel: block_cancellation,
};

tokio::spawn(multiplex_job(simulations_for_block.orders, broadcast_input));

if let Some(builder) = self.builder.as_ref() {
builder.build_blocks(input);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though you know that BlockBuildingPool is not blocking, builders are expected to be blocking. We should tokio::task::spawn_blocking this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one do you want as spawn_blocking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

builder.build_blocks(input);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check again, I think this is resolved now.

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this into a func so the code is more readable like before?


watchdog_sender.try_send(()).unwrap_or_default();
}
Expand All @@ -237,6 +288,17 @@ impl<DB: Database + Clone + 'static, BuilderSourceType: SlotSource>
}
}

async fn multiplex_job<T>(mut input: mpsc::Receiver<T>, sender: broadcast::Sender<T>) {
// we don't worry about waiting for input forever because it will be closed by producer job
while let Some(input) = input.recv().await {
// we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers
if sender.send(input).is_err() {
return;
}
}
trace!("Cancelling multiplex job");
}

/// May fail if we wait too much (see [BLOCK_HEADER_DEAD_LINE_DELTA])
async fn wait_for_block_header<DB: Database>(
block: B256,
Expand Down
Loading