Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel sealing #187

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Bid {
}
}

/// Makes the actual bid (send it to the relay)
/// Makes the actual bid (seal + send it to the relay).
pub trait BidMaker: std::fmt::Debug {
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
fn send_bid(&self, bid: Bid);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod interfaces;
pub mod parallel_sealer_bid_maker;
pub mod sequential_sealer_bid_maker;
pub mod true_block_value_bidder;
pub mod wallet_balance_watcher;
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::live_builder::block_output::relay_submit::BlockBuildingSink;

use super::interfaces::{Bid, BidMaker};

/// BidMaker with a background task sealing multiple parallel bids concurrently.
/// If several bids arrive while we hit the max of concurrent sealings we keep only the last one since we assume new is better.
#[derive(Debug)]
pub struct ParallelSealerBidMaker {
pending_bid: Arc<PendingBid>,
}

impl BidMaker for ParallelSealerBidMaker {
fn send_bid(&self, bid: Bid) {
self.pending_bid.update(bid);
}
}

/// Object used to send new bids to the [ParallelSealerBidMakerProcess].
#[derive(Debug)]
struct PendingBid {
/// Next bid to send.
bid: Mutex<Option<Bid>>,
/// Signaled when we set a new bid.
bid_notify: Arc<Notify>,
}

impl PendingBid {
fn new(bid_notify: Arc<Notify>) -> Self {
Self {
bid: Default::default(),
bid_notify,
}
}
/// Updates bid, replacing on current (we assume they are always increasing but we don't check it).
fn update(&self, bid: Bid) {
let mut current_bid = self.bid.lock().unwrap();
*current_bid = Some(bid);
self.bid_notify.notify_one();
}

fn consume_bid(&self) -> Option<Bid> {
let mut current_bid = self.bid.lock().unwrap();
current_bid.take()
}
}

impl ParallelSealerBidMaker {
pub fn new(
max_concurrent_seals: usize,
sink: Arc<dyn BlockBuildingSink>,
cancel: CancellationToken,
) -> Self {
let notify = Arc::new(Notify::new());
let pending_bid = Arc::new(PendingBid::new(notify.clone()));
let mut sealing_process = ParallelSealerBidMakerProcess {
sink,
cancel,
pending_bid: pending_bid.clone(),
notify: notify.clone(),
seal_control: Arc::new(SealsInProgress {
notify,
seals_in_progress: Default::default(),
}),
max_concurrent_seals,
};

tokio::task::spawn(async move {
sealing_process.run().await;
});
Self { pending_bid }
}
}

struct SealsInProgress {
/// Signaled when a sealing finishes.
notify: Arc<Notify>,
/// Number of current sealings in progress.
seals_in_progress: Mutex<usize>,
}

/// Background task waiting for new bids to seal.
struct ParallelSealerBidMakerProcess {
/// Destination of the finished blocks.
sink: Arc<dyn BlockBuildingSink>,
cancel: CancellationToken,
pending_bid: Arc<PendingBid>,
/// Signaled when we set a new bid or a sealing finishes.
notify: Arc<Notify>,
/// Shared between the sealing tasks and the main loop.
seal_control: Arc<SealsInProgress>,
/// Maximum number of concurrent sealings.
max_concurrent_seals: usize,
}

impl ParallelSealerBidMakerProcess {
async fn run(&mut self) {
loop {
tokio::select! {
_ = self.wait_for_change() => self.check_for_new_bid().await,
_ = self.cancel.cancelled() => return
}
}
}

async fn wait_for_change(&self) {
self.notify.notified().await
}

/// block.finalize_block + self.sink.new_block inside spawn_blocking.
async fn check_for_new_bid(&mut self) {
if *self.seal_control.seals_in_progress.lock().unwrap() >= self.max_concurrent_seals {
return;
}
if let Some(bid) = self.pending_bid.consume_bid() {
let payout_tx_val = bid.payout_tx_value();
let block = bid.block();
let block_number = block.building_context().block();
// Take sealing "slot"
*self.seal_control.seals_in_progress.lock().unwrap() += 1;
let seal_control = self.seal_control.clone();
let sink = self.sink.clone();
tokio::task::spawn_blocking(move || {
match block.finalize_block(payout_tx_val) {
Ok(res) => sink.new_block(res.block),
Err(error) => error!(
block_number,
?error,
"Error on finalize_block on ParallelSealerBidMaker"
),
};
// release sealing "slot"
*seal_control.seals_in_progress.lock().unwrap() -= 1;
seal_control.notify.notify_one();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
bid_value_source::interfaces::{BidValueObs, BidValueSource},
bidding::{
interfaces::{BiddingService, SlotBidder},
sequential_sealer_bid_maker::SequentialSealerBidMaker,
parallel_sealer_bid_maker::ParallelSealerBidMaker,
wallet_balance_watcher::WalletBalanceWatcher,
},
relay_submit::BuilderSinkFactory,
Expand All @@ -31,6 +31,8 @@ pub struct BlockSealingBidderFactory {
/// SlotBidder are subscribed to the proper block in the bid_value_source.
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
/// See [ParallelSealerBidMaker]
max_concurrent_seals: usize,
}

impl BlockSealingBidderFactory {
Expand All @@ -39,12 +41,14 @@ impl BlockSealingBidderFactory {
block_sink_factory: Box<dyn BuilderSinkFactory>,
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
max_concurrent_seals: usize,
) -> Self {
Self {
bidding_service,
block_sink_factory,
competition_bid_value_source,
wallet_balance_watcher,
max_concurrent_seals,
}
}
}
Expand Down Expand Up @@ -86,7 +90,11 @@ impl UnfinishedBlockBuildingSinkFactory for BlockSealingBidderFactory {
self.competition_bid_value_source.clone(),
cancel.clone(),
);
let sealer = SequentialSealerBidMaker::new(Arc::from(finished_block_sink), cancel.clone());
let sealer = ParallelSealerBidMaker::new(
self.max_concurrent_seals,
Arc::from(finished_block_sink),
cancel.clone(),
);

let slot_bidder: Arc<dyn SlotBidder> = self.bidding_service.create_slot_bidder(
slot_data.block(),
Expand Down
3 changes: 3 additions & 0 deletions crates/rbuilder/src/live_builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const DEFAULT_SLOT_DELTA_TO_START_SUBMITS: time::Duration = time::Duration::mill
/// We initialize the wallet with the last full day. This should be enough for any bidder.
/// On debug I measured this to be < 300ms so it's not big deal.
pub const WALLET_INIT_HISTORY_SIZE: Duration = Duration::from_secs(60 * 60 * 24);
/// Number of sealing processes to run in parallel for each builder algorithm.
pub const SEALING_PROCESSES_PER_BUILDER_ALGORITHM: usize = 2;

/// This example has a single building algorithm cfg but the idea of this enum is to have several builders
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -300,6 +302,7 @@ impl LiveBuilderConfig for Config {
sink_sealed_factory,
Arc::new(NullBidValueSource {}),
wallet_balance_watcher,
SEALING_PROCESSES_PER_BUILDER_ALGORITHM * self.builders.len(),
));

let payload_event = MevBoostSlotDataGenerator::new(
Expand Down
Loading