Skip to content

Commit

Permalink
Parallel sealing (#187)
Browse files Browse the repository at this point in the history
## 📝 Summary

This PR replaces the SequentialSealerBidMaker for the
ParallelSealerBidMaker to allow concurrent block sealing.

## 💡 Motivation and Context

Inclution rate went down when we deployed the version using
SequentialSealerBidMaker.

## ✅ I have completed the following steps:

* [ X] Run `make lint`
* [ X] Run `make test`
* [ ] Added tests (if applicable)
  • Loading branch information
ZanCorDX authored Sep 24, 2024
1 parent ab0529f commit 04ae782
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Bid {
}
}

/// Makes the actual bid (send it to the relay)
/// Makes the actual bid (seal + send it to the relay).
pub trait BidMaker: std::fmt::Debug {
fn send_bid(&self, bid: Bid);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod interfaces;
pub mod parallel_sealer_bid_maker;
pub mod sequential_sealer_bid_maker;
pub mod true_block_value_bidder;
pub mod wallet_balance_watcher;
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::live_builder::block_output::relay_submit::BlockBuildingSink;

use super::interfaces::{Bid, BidMaker};

/// BidMaker with a background task sealing multiple parallel bids concurrently.
/// If several bids arrive while we hit the max of concurrent sealings we keep only the last one since we assume new is better.
#[derive(Debug)]
pub struct ParallelSealerBidMaker {
pending_bid: Arc<PendingBid>,
}

impl BidMaker for ParallelSealerBidMaker {
fn send_bid(&self, bid: Bid) {
self.pending_bid.update(bid);
}
}

/// Object used to send new bids to the [ParallelSealerBidMakerProcess].
#[derive(Debug)]
struct PendingBid {
/// Next bid to send.
bid: Mutex<Option<Bid>>,
/// Signaled when we set a new bid.
bid_notify: Arc<Notify>,
}

impl PendingBid {
fn new(bid_notify: Arc<Notify>) -> Self {
Self {
bid: Default::default(),
bid_notify,
}
}
/// Updates bid, replacing on current (we assume they are always increasing but we don't check it).
fn update(&self, bid: Bid) {
let mut current_bid = self.bid.lock().unwrap();
*current_bid = Some(bid);
self.bid_notify.notify_one();
}

fn consume_bid(&self) -> Option<Bid> {
let mut current_bid = self.bid.lock().unwrap();
current_bid.take()
}
}

impl ParallelSealerBidMaker {
pub fn new(
max_concurrent_seals: usize,
sink: Arc<dyn BlockBuildingSink>,
cancel: CancellationToken,
) -> Self {
let notify = Arc::new(Notify::new());
let pending_bid = Arc::new(PendingBid::new(notify.clone()));
let mut sealing_process = ParallelSealerBidMakerProcess {
sink,
cancel,
pending_bid: pending_bid.clone(),
notify: notify.clone(),
seal_control: Arc::new(SealsInProgress {
notify,
seals_in_progress: Default::default(),
}),
max_concurrent_seals,
};

tokio::task::spawn(async move {
sealing_process.run().await;
});
Self { pending_bid }
}
}

struct SealsInProgress {
/// Signaled when a sealing finishes.
notify: Arc<Notify>,
/// Number of current sealings in progress.
seals_in_progress: Mutex<usize>,
}

/// Background task waiting for new bids to seal.
struct ParallelSealerBidMakerProcess {
/// Destination of the finished blocks.
sink: Arc<dyn BlockBuildingSink>,
cancel: CancellationToken,
pending_bid: Arc<PendingBid>,
/// Signaled when we set a new bid or a sealing finishes.
notify: Arc<Notify>,
/// Shared between the sealing tasks and the main loop.
seal_control: Arc<SealsInProgress>,
/// Maximum number of concurrent sealings.
max_concurrent_seals: usize,
}

impl ParallelSealerBidMakerProcess {
async fn run(&mut self) {
loop {
tokio::select! {
_ = self.wait_for_change() => self.check_for_new_bid().await,
_ = self.cancel.cancelled() => return
}
}
}

async fn wait_for_change(&self) {
self.notify.notified().await
}

/// block.finalize_block + self.sink.new_block inside spawn_blocking.
async fn check_for_new_bid(&mut self) {
if *self.seal_control.seals_in_progress.lock().unwrap() >= self.max_concurrent_seals {
return;
}
if let Some(bid) = self.pending_bid.consume_bid() {
let payout_tx_val = bid.payout_tx_value();
let block = bid.block();
let block_number = block.building_context().block();
// Take sealing "slot"
*self.seal_control.seals_in_progress.lock().unwrap() += 1;
let seal_control = self.seal_control.clone();
let sink = self.sink.clone();
tokio::task::spawn_blocking(move || {
match block.finalize_block(payout_tx_val) {
Ok(res) => sink.new_block(res.block),
Err(error) => error!(
block_number,
?error,
"Error on finalize_block on ParallelSealerBidMaker"
),
};
// release sealing "slot"
*seal_control.seals_in_progress.lock().unwrap() -= 1;
seal_control.notify.notify_one();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
bid_value_source::interfaces::{BidValueObs, BidValueSource},
bidding::{
interfaces::{BiddingService, SlotBidder},
sequential_sealer_bid_maker::SequentialSealerBidMaker,
parallel_sealer_bid_maker::ParallelSealerBidMaker,
wallet_balance_watcher::WalletBalanceWatcher,
},
relay_submit::BuilderSinkFactory,
Expand All @@ -31,6 +31,8 @@ pub struct BlockSealingBidderFactory {
/// SlotBidder are subscribed to the proper block in the bid_value_source.
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
/// See [ParallelSealerBidMaker]
max_concurrent_seals: usize,
}

impl BlockSealingBidderFactory {
Expand All @@ -39,12 +41,14 @@ impl BlockSealingBidderFactory {
block_sink_factory: Box<dyn BuilderSinkFactory>,
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
max_concurrent_seals: usize,
) -> Self {
Self {
bidding_service,
block_sink_factory,
competition_bid_value_source,
wallet_balance_watcher,
max_concurrent_seals,
}
}
}
Expand Down Expand Up @@ -86,7 +90,11 @@ impl UnfinishedBlockBuildingSinkFactory for BlockSealingBidderFactory {
self.competition_bid_value_source.clone(),
cancel.clone(),
);
let sealer = SequentialSealerBidMaker::new(Arc::from(finished_block_sink), cancel.clone());
let sealer = ParallelSealerBidMaker::new(
self.max_concurrent_seals,
Arc::from(finished_block_sink),
cancel.clone(),
);

let slot_bidder: Arc<dyn SlotBidder> = self.bidding_service.create_slot_bidder(
slot_data.block(),
Expand Down
3 changes: 3 additions & 0 deletions crates/rbuilder/src/live_builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const DEFAULT_SLOT_DELTA_TO_START_SUBMITS: time::Duration = time::Duration::mill
/// We initialize the wallet with the last full day. This should be enough for any bidder.
/// On debug I measured this to be < 300ms so it's not big deal.
pub const WALLET_INIT_HISTORY_SIZE: Duration = Duration::from_secs(60 * 60 * 24);
/// Number of sealing processes to run in parallel for each builder algorithm.
pub const SEALING_PROCESSES_PER_BUILDER_ALGORITHM: usize = 2;

/// This example has a single building algorithm cfg but the idea of this enum is to have several builders
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -300,6 +302,7 @@ impl LiveBuilderConfig for Config {
sink_sealed_factory,
Arc::new(NullBidValueSource {}),
wallet_balance_watcher,
SEALING_PROCESSES_PER_BUILDER_ALGORITHM * self.builders.len(),
));

let payload_event = MevBoostSlotDataGenerator::new(
Expand Down

0 comments on commit 04ae782

Please sign in to comment.