Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the block builder pool a block building algorithm #141

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions crates/rbuilder/src/bin/debug-order-sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ pub async fn main() -> eyre::Result<()> {
);

let block_cancel = CancellationToken::new();
let mut sim_results =
sim_pool.spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone());
let mut sim_results = sim_pool
.spawn_simulation_job(block_ctx, orders_for_block, block_cancel.clone())
.subscribe();
loop {
tokio::select! {
new_slot = slots.recv() => {
Expand All @@ -147,8 +148,8 @@ pub async fn main() -> eyre::Result<()> {
break 'slots;
}
},
sim_command = sim_results.orders.recv() => {
if let Some(sim_command) = sim_command {
sim_command = sim_results.recv() => {
if let Ok(sim_command) = sim_command {
match sim_command{
SimulatedOrderCommand::Simulation(sim) => {
orders_last_slot += 1;
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn main() -> eyre::Result<()> {
global_cancellation: cancel.clone(),
extra_rpc: RpcModule::new(()),
sink_factory: Box::new(TraceBlockSinkFactory {}),
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
builder: Arc::new(DummyBuildingAlgorithm::new(10)),
};

let ctrlc = tokio::spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
use crate::{
building::builders::UnfinishedBlockBuildingSinkFactory,
live_builder::{order_input::OrderInputConfig, LiveBuilder},
live_builder::{order_input::OrderInputConfig, LiveBuilder, NullBlockBuildingAlgorithm},
telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig},
utils::{http_provider, BoxedProvider, ProviderFactoryReopener, Signer},
};
Expand Down Expand Up @@ -193,7 +193,7 @@ impl BaseConfig {

extra_rpc: RpcModule::new(()),
sink_factory,
builders: Vec::new(),
builder: Arc::new(NullBlockBuildingAlgorithm {}),
})
}

Expand Down
132 changes: 25 additions & 107 deletions crates/rbuilder/src/live_builder/building/mod.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,57 @@
use std::{sync::Arc, time::Duration};

use crate::{
building::{
builders::{
BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, UnfinishedBlockBuildingSinkFactory,
},
BlockBuildingContext,
},
live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults},
utils::ProviderFactoryReopener,
};
use crate::building::builders::{BlockBuildingAlgorithm, BlockBuildingAlgorithmInput};
use reth_db::database::Database;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace};

use super::{
order_input::{
self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock,
},
payload_events,
simulation::OrderSimulationPool,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace};

#[derive(Debug)]
pub struct BlockBuildingPool<DB> {
provider_factory: ProviderFactoryReopener<DB>,
builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>,
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
}

impl<DB: Database + Clone + 'static> BlockBuildingPool<DB> {
pub fn new(
provider_factory: ProviderFactoryReopener<DB>,
builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>,
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
) -> Self {
BlockBuildingPool {
provider_factory,
builders,
sink_factory,
orderpool_subscriber,
order_simulation_pool,
}
pub fn new(builders: Vec<Arc<dyn BlockBuildingAlgorithm<DB>>>) -> Self {
BlockBuildingPool { builders }
}
}

/// Connects OrdersForBlock->OrderReplacementManager->Simulations and calls start_building_job
pub fn start_block_building(
&mut self,
payload: payload_events::MevBoostSlotData,
block_ctx: BlockBuildingContext,
global_cancellation: CancellationToken,
max_time_to_build: Duration,
) {
let block_cancellation = global_cancellation.child_token();

let cancel = block_cancellation.clone();
tokio::spawn(async move {
tokio::time::sleep(max_time_to_build).await;
cancel.cancel();
});

let (orders_for_block, sink) = OrdersForBlock::new_with_sink();
// add OrderReplacementManager to manage replacements and cancellations
let order_replacement_manager = OrderReplacementManager::new(Box::new(sink));
// sink removal is automatic via OrderSink::is_alive false
let _block_sub = self.orderpool_subscriber.add_sink(
block_ctx.block_env.number.to(),
Box::new(order_replacement_manager),
);

let simulations_for_block = self.order_simulation_pool.spawn_simulation_job(
block_ctx.clone(),
orders_for_block,
block_cancellation.clone(),
);
self.start_building_job(
block_ctx,
payload,
simulations_for_block,
block_cancellation,
);
impl<DB: Database + std::fmt::Debug + Clone + 'static> BlockBuildingAlgorithm<DB>
for BlockBuildingPool<DB>
{
fn name(&self) -> String {
"BlockBuildingPool".to_string()
}

/// Per each BlockBuildingAlgorithm creates BlockBuildingAlgorithmInput and Sinks and spawn a task to run it
fn start_building_job(
&mut self,
ctx: BlockBuildingContext,
slot_data: MevBoostSlotData,
input: SlotOrderSimResults,
cancel: CancellationToken,
) {
let builder_sink = self.sink_factory.create_sink(slot_data, cancel.clone());
fn build_blocks(&self, input: BlockBuildingAlgorithmInput<DB>) {
let (broadcast_input, _) = broadcast::channel(10_000);

let block_number = ctx.block_env.number.to::<u64>();
let provider_factory = match self
.provider_factory
.check_consistency_and_reopen_if_needed(block_number)
{
Ok(provider_factory) => provider_factory,
Err(err) => {
error!(?err, "Error while reopening provider factory");
return;
}
};
let block_number = input.ctx.block_env.number.to::<u64>();

for builder in self.builders.iter() {
let builder_name = builder.name();
debug!(block = block_number, builder_name, "Spawning builder job");
let input = BlockBuildingAlgorithmInput::<DB> {
provider_factory: provider_factory.clone(),
ctx: ctx.clone(),

let builder = builder.clone();
let input = BlockBuildingAlgorithmInput {
provider_factory: input.provider_factory.clone(),
ctx: input.ctx.clone(),
input: broadcast_input.subscribe(),
sink: builder_sink.clone(),
cancel: cancel.clone(),
sink: input.sink.clone(),
cancel: input.cancel.clone(),
};
let builder = builder.clone();

tokio::task::spawn_blocking(move || {
builder.build_blocks(input);
debug!(block = block_number, builder_name, "Stopped builder job");
});
}

tokio::spawn(multiplex_job(input.orders, broadcast_input));
tokio::spawn(multiplex_job(input.input, broadcast_input));
}
}

async fn multiplex_job<T>(mut input: mpsc::Receiver<T>, sender: broadcast::Sender<T>) {
async fn multiplex_job<T: Clone>(mut input: broadcast::Receiver<T>, sender: broadcast::Sender<T>) {
// we don't worry about waiting for input forever because it will be closed by producer job
while let Some(input) = input.recv().await {
while let Ok(input) = input.recv().await {
// we don't create new subscribers to the broadcast so here we can be sure that err means end of receivers
if sender.send(input).is_err() {
return;
Expand Down
4 changes: 3 additions & 1 deletion crates/rbuilder/src/live_builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
},
cli::LiveBuilderConfig,
payload_events::MevBoostSlotDataGenerator,
BlockBuildingPool,
},
mev_boost::BLSBlockSigner,
primitives::mev_boost::{MevBoostRelay, RelayConfig},
Expand Down Expand Up @@ -306,7 +307,8 @@ impl LiveBuilderConfig for Config {
root_hash_task_pool,
self.base_config.sbundle_mergeabe_signers(),
);
Ok(live_builder.with_builders(builders))
let builder = BlockBuildingPool::new(builders);
Ok(live_builder.with_builder(Arc::new(builder)))
}

fn version_for_telemetry(&self) -> crate::utils::build_info::Version {
Expand Down
Loading
Loading