Skip to content

Commit

Permalink
Automatically adjust fee rate
Browse files Browse the repository at this point in the history
and automatically reset submission txs when a block cannot be confirmed
for an extended period of time.
  • Loading branch information
blckngm committed Feb 22, 2023
1 parent 4d0e922 commit ab4f80b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 18 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ tracing = { version = "0.1", features = ["attributes"] }
tentacle = "0.4.0"
gw-p2p-network = { path = "../p2p-network" }
bytes = "1.2.0"
pid = "4.0.0"
rand = "0.8.5"

[features]
6 changes: 3 additions & 3 deletions crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ pub struct BlockProducer {
chain: Arc<Mutex<Chain>>,
generator: Arc<Generator>,
wallet: Wallet,
config: BlockProducerConfig,
rpc_client: RPCClient,
ckb_genesis_info: CKBGenesisInfo,
tests_control: Option<TestModeControl>,
Expand Down Expand Up @@ -126,7 +125,6 @@ impl BlockProducer {
rpc_client,
wallet,
ckb_genesis_info,
config,
tests_control,
store,
contracts_dep_manager,
Expand Down Expand Up @@ -205,6 +203,7 @@ impl BlockProducer {
since,
withdrawal_extras,
local_cells_manager,
fee_rate,
} = args;

let rollup_cell = query_rollup_cell(local_cells_manager, &self.rpc_client)
Expand Down Expand Up @@ -475,7 +474,7 @@ impl BlockProducer {
&self.rpc_client.indexer,
self.wallet.lock_script().to_owned(),
local_cells_manager,
self.config.fee_rate,
fee_rate,
)
.await?;
debug_assert_eq!(
Expand All @@ -501,6 +500,7 @@ pub struct ComposeSubmitTxArgs<'a> {
pub since: Since,
pub withdrawal_extras: Vec<WithdrawalRequestExtra>,
pub local_cells_manager: &'a LocalCellsManager,
pub fee_rate: u64,
}

#[derive(thiserror::Error, Debug)]
Expand Down
118 changes: 108 additions & 10 deletions crates/block-producer/src/psc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use gw_utils::{
abort_on_drop::spawn_abort_on_drop, liveness::Liveness, local_cells::LocalCellsManager,
since::Since, RollupContext,
};
use pid::Pid;
use rand::{thread_rng, Rng};
use tokio::{
signal::unix::{signal, SignalKind},
sync::Mutex,
Expand All @@ -44,11 +46,15 @@ pub struct ProduceSubmitConfirm {
context: Arc<PSCContext>,
local_count: u64,
submitted_count: u64,
current_fee_rate: u64,
fee_rate_controller: Option<Pid<f64>>,
}

impl ProduceSubmitConfirm {
fn new(context: Arc<PSCContext>) -> Self {
Self {
current_fee_rate: context.psc_config.min_fee_rate,
fee_rate_controller: context.psc_config.fee_rate_pid,
context,
local_count: 0,
submitted_count: 0,
Expand Down Expand Up @@ -170,6 +176,11 @@ impl ProduceSubmitConfirm {
Ok(()) => return Ok(()),
Err(e) => {
log::warn!("{:#}", e);
if e.is::<ShouldResetTxError>() {
reset_submission_txs(&mut self).await?;
continue;
}

if let Some(should_revert) = e.downcast_ref::<ShouldRevertError>() {
let revert_to = should_revert.0 - 1;
log::info!("revert to block {revert_to}");
Expand Down Expand Up @@ -252,17 +263,29 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
let mut revert_local_signal = signal(SignalKind::user_defined1())?;
let mut revert_submitted_signal = signal(SignalKind::user_defined2())?;

let has_fee_rate_controller = state.fee_rate_controller.is_some();
let mut fee_rate_adjust_interval =
tokio::time::interval(Duration::from_secs(config.fee_rate_pid_interval_secs));
fee_rate_adjust_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

let half_fee_rate_range = (config.max_fee_rate - config.min_fee_rate) / 2;
let mid_fee_rate = config.min_fee_rate + half_fee_rate_range;
if let Some(fee_rate_pid) = state.fee_rate_controller.as_mut() {
fee_rate_pid.output_limit = fee_rate_pid.output_limit.min(1.0);
}

loop {
if !submitting && state.local_count > 0 && state.submitted_count < config.submitted_limit {
submitting = true;
let context = state.context.clone();
let fee_rate = state.current_fee_rate;
submit_handle.replace_with(tokio::spawn(async move {
loop {
submit_pending_l1_upgrade(&context)
.await
.with_context(|| "failed to submit pending l1 upgrade")?;

match submit_next_block(&context).await {
match submit_next_block(&context, fee_rate).await {
Ok(nh) => return Ok(nh),
Err(err) => {
if err.is::<ShouldResyncError>() || err.is::<ShouldRevertError>() {
Expand All @@ -288,7 +311,10 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
match confirm_next_block(&context).await {
Ok(nh) => break Ok(nh),
Err(err) => {
if err.is::<ShouldResyncError>() || err.is::<ShouldRevertError>() {
if err.is::<ShouldResyncError>()
|| err.is::<ShouldRevertError>()
|| err.is::<ShouldResetTxError>()
{
bail!(err);
}
log::warn!("failed to confirm next block: {:#}", err);
Expand Down Expand Up @@ -326,6 +352,13 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
.unpack();
bail!(ShouldRevertError(last_confirmed));
}
_ = fee_rate_adjust_interval.tick(), if has_fee_rate_controller => {
let fee_rate_pid = state.fee_rate_controller.as_mut().unwrap();
let output = fee_rate_pid.next_control_output(state.submitted_count as f64 / config.submitted_limit as f64);
let f = (mid_fee_rate as f64 + half_fee_rate_range as f64 * output.output) as u64;
// Just to be safe from possible rounding errors.
state.current_fee_rate = f.clamp(config.min_fee_rate, config.max_fee_rate);
}
// Block confirmed.
result = &mut confirm_handle, if confirming => {
confirming = false;
Expand All @@ -341,6 +374,7 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
publish_confirmed(&mut sync_server, &state.context.store.get_snapshot(), nh.number().unpack())?;
}
state.set_submitted_count(state.submitted_count - 1);
state.context.liveness.tick();
}
_ => {}
}
Expand All @@ -361,6 +395,7 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
}
state.set_local_count(state.local_count - 1);
state.set_submitted_count(state.submitted_count + 1);
state.context.liveness.tick();
}
_ => {}
}
Expand All @@ -373,14 +408,45 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
log::warn!("failed to produce local block: {:#}", e);
} else {
state.set_local_count(state.local_count + 1);
state.context.liveness.tick();
}
}
}
// We have produced, submitted or confirmed a block. Update liveness tick.
state.context.liveness.tick();
}
}

/// Reset last submitted to last confirmed.
async fn reset_submission_txs(state: &mut ProduceSubmitConfirm) -> Result<()> {
let mut store_tx = state.context.store.begin_transaction();
let last_confirmed = store_tx
.get_last_confirmed_block_number_hash()
.context("get last confirmed")?;
let last_confirmed_block_number = last_confirmed.number().unpack();
let last_submitted = store_tx
.get_last_submitted_block_number_hash()
.context("get last submitted")?
.number()
.unpack();
for i in last_confirmed_block_number + 1..last_submitted {
store_tx.delete_submit_tx(i)?;
}
store_tx.set_last_submitted_block_number_hash(&last_confirmed.as_reader())?;
store_tx.commit()?;

state.set_local_count(state.local_count + state.submitted_count);
state.set_submitted_count(0);
state.context.local_cells_manager.lock().await.reset();

// Use random fee rate.
let c = &state.context.psc_config;
let fee_rate = thread_rng().gen_range(c.min_fee_rate..=c.max_fee_rate);
state.current_fee_rate = fee_rate;

log::info!("reset last submitted to #{last_confirmed_block_number}, fee rate to {fee_rate}");

Ok(())
}

/// Produce and save local block.
#[instrument(skip_all)]
async fn produce_local_block(ctx: &PSCContext) -> Result<()> {
Expand Down Expand Up @@ -490,7 +556,7 @@ async fn produce_local_block(ctx: &PSCContext) -> Result<()> {
Ok(())
}

async fn submit_next_block(ctx: &PSCContext) -> Result<NumberHash> {
async fn submit_next_block(ctx: &PSCContext, fee_rate: u64) -> Result<NumberHash> {
let snap = ctx.store.get_snapshot();
// L2 block number to submit.
let block_number = snap
Expand All @@ -508,7 +574,7 @@ async fn submit_next_block(ctx: &PSCContext) -> Result<NumberHash> {
// it does, it means that previous block is probably not confirmed
// anymore, and we should sync with L1 again.
let is_first = block_number == last_confirmed + 1;
submit_block(ctx, snap, is_first, block_number).await
submit_block(ctx, snap, is_first, block_number, fee_rate).await
}

#[instrument(skip(ctx, snap, is_first))]
Expand All @@ -517,6 +583,7 @@ async fn submit_block(
snap: StoreSnapshot,
is_first: bool,
block_number: u64,
fee_rate: u64,
) -> Result<NumberHash> {
// L2 block hash to submit.
let block_hash = snap
Expand Down Expand Up @@ -563,6 +630,7 @@ async fn submit_block(
since,
withdrawal_extras,
local_cells_manager: &*local_cells_manager,
fee_rate,
};
let tx = ctx
.block_producer
Expand Down Expand Up @@ -652,9 +720,14 @@ async fn submit_block(
.build())
}

async fn poll_tx_confirmed(rpc_client: &RPCClient, tx: &Transaction) -> Result<()> {
async fn poll_tx_confirmed(
rpc_client: &RPCClient,
tx: &Transaction,
timeout: Option<Duration>,
) -> Result<()> {
log::info!("waiting for tx 0x{}", hex::encode(tx.hash()));
let mut last_sent = Instant::now();
let initial_instant = Instant::now();
let mut last_sent = initial_instant;
loop {
let status = rpc_client.ckb.get_transaction_status(tx.hash()).await?;
use gw_jsonrpc_types::ckb_jsonrpc_types::Status;
Expand All @@ -674,6 +747,10 @@ async fn poll_tx_confirmed(rpc_client: &RPCClient, tx: &Transaction) -> Result<(
_ => last_sent.elapsed() > Duration::from_secs(24),
};
if should_resend {
if timeout.is_some() && Some(initial_instant.elapsed()) > timeout {
bail!(ConfirmTimeoutError);
}

log::info!("resend transaction 0x{}", hex::encode(tx.hash()));
send_transaction_or_check_inputs(rpc_client, tx).await?;
last_sent = Instant::now();
Expand Down Expand Up @@ -721,13 +798,21 @@ async fn confirm_block(
.get_block_submit_tx(block_number)
.expect("get submit tx");
drop(snap);
poll_tx_confirmed(&context.rpc_client, &tx)
// Use a timeout if there is room for fee rate bumping.
let timeout = if context.psc_config.max_fee_rate > context.psc_config.min_fee_rate {
Some(Duration::from_secs(context.psc_config.confirm_timeout_secs))
} else {
None
};
poll_tx_confirmed(&context.rpc_client, &tx, timeout)
.await
.map_err(|e| {
if e.is::<UnknownCellError>() {
e.context(ShouldResyncError)
} else if e.is::<DeadCellError>() {
e.context(ShouldRevertError(block_number))
} else if e.is::<ConfirmTimeoutError>() {
e.context(ShouldResetTxError)
} else {
e
}
Expand Down Expand Up @@ -810,7 +895,7 @@ async fn confirm_pending_l1_upgrade(ctx: &PSCContext) -> Result<()> {
let tx: Transaction = l1_upgrade.signed_transaction.clone().into();
// We can't recover from errors when confirming l1 upgrade tx,
// both deadcell error and unknwown cell is unacceptable, so we just throw it
poll_tx_confirmed(&ctx.rpc_client, &tx).await?;
poll_tx_confirmed(&ctx.rpc_client, &tx, None).await?;
log::info!("l1 upgrade tx confirmed");
}
Ok(())
Expand Down Expand Up @@ -917,6 +1002,19 @@ impl Display for ShouldResyncError {
}
}

#[derive(Debug)]
struct ShouldResetTxError;

impl Display for ShouldResetTxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "should reset txs")
}
}

#[derive(Debug, thiserror::Error)]
#[error("confirm transaction timeout")]
struct ConfirmTimeoutError;

#[derive(thiserror::Error, Debug)]
struct DeadCellError {
consumed_by_tx: Option<H256>,
Expand Down
1 change: 1 addition & 0 deletions crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ serde = { version = "1.0", features = ["derive"] }
hex = "0.4"
lazy_static = "1.4"
toml = "0.5"
pid = { version = "4.0.0", features = ["serde"] }
Loading

0 comments on commit ab4f80b

Please sign in to comment.