Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically adjust fee rate #999

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ tracing = { version = "0.1", features = ["attributes"] }
tentacle = "0.4.0"
gw-p2p-network = { path = "../p2p-network" }
bytes = "1.2.0"
pid = "4.0.0"
rand = "0.8.5"

[features]
6 changes: 3 additions & 3 deletions crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub struct BlockProducer {
chain: Arc<Mutex<Chain>>,
generator: Arc<Generator>,
wallet: Wallet,
config: BlockProducerConfig,
rpc_client: RPCClient,
ckb_genesis_info: CKBGenesisInfo,
tests_control: Option<TestModeControl>,
Expand Down Expand Up @@ -127,7 +126,6 @@ impl BlockProducer {
rpc_client,
wallet,
ckb_genesis_info,
config,
tests_control,
store,
contracts_dep_manager,
Expand Down Expand Up @@ -206,6 +204,7 @@ impl BlockProducer {
since,
withdrawal_extras,
local_cells_manager,
fee_rate,
} = args;

let rollup_cell = query_rollup_cell(local_cells_manager, &self.rpc_client)
Expand Down Expand Up @@ -503,7 +502,7 @@ impl BlockProducer {
&self.rpc_client.indexer,
self.wallet.lock_script().to_owned(),
local_cells_manager,
self.config.fee_rate,
fee_rate,
)
.await?;
debug_assert_eq!(
Expand Down Expand Up @@ -563,6 +562,7 @@ pub struct ComposeSubmitTxArgs<'a> {
pub since: Since,
pub withdrawal_extras: Vec<WithdrawalRequestExtra>,
pub local_cells_manager: &'a LocalCellsManager,
pub fee_rate: u64,
}

#[derive(thiserror::Error, Debug)]
Expand Down
118 changes: 108 additions & 10 deletions crates/block-producer/src/psc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use gw_utils::{
abort_on_drop::spawn_abort_on_drop, liveness::Liveness, local_cells::LocalCellsManager,
since::Since, RollupContext,
};
use pid::Pid;
use rand::{thread_rng, Rng};
use tokio::{
signal::unix::{signal, SignalKind},
sync::Mutex,
Expand All @@ -44,11 +46,15 @@ pub struct ProduceSubmitConfirm {
context: Arc<PSCContext>,
local_count: u64,
submitted_count: u64,
current_fee_rate: u64,
fee_rate_controller: Option<Pid<f64>>,
}

impl ProduceSubmitConfirm {
fn new(context: Arc<PSCContext>) -> Self {
Self {
current_fee_rate: context.psc_config.min_fee_rate,
fee_rate_controller: context.psc_config.fee_rate_pid,
context,
local_count: 0,
submitted_count: 0,
Expand Down Expand Up @@ -170,6 +176,11 @@ impl ProduceSubmitConfirm {
Ok(()) => return Ok(()),
Err(e) => {
log::warn!("{:#}", e);
if e.is::<ShouldResetTxError>() {
reset_submission_txs(&mut self).await?;
continue;
}

if let Some(should_revert) = e.downcast_ref::<ShouldRevertError>() {
let revert_to = should_revert.0 - 1;
log::info!("revert to block {revert_to}");
Expand Down Expand Up @@ -252,17 +263,29 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
let mut revert_local_signal = signal(SignalKind::user_defined1())?;
let mut revert_submitted_signal = signal(SignalKind::user_defined2())?;

let has_fee_rate_controller = state.fee_rate_controller.is_some();
let mut fee_rate_adjust_interval =
tokio::time::interval(Duration::from_secs(config.fee_rate_pid_interval_secs));
fee_rate_adjust_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

let half_fee_rate_range = (config.max_fee_rate - config.min_fee_rate) / 2;
let mid_fee_rate = config.min_fee_rate + half_fee_rate_range;
if let Some(fee_rate_pid) = state.fee_rate_controller.as_mut() {
fee_rate_pid.output_limit = fee_rate_pid.output_limit.min(1.0);
}

loop {
if !submitting && state.local_count > 0 && state.submitted_count < config.submitted_limit {
submitting = true;
let context = state.context.clone();
let fee_rate = state.current_fee_rate;
submit_handle.replace_with(tokio::spawn(async move {
loop {
submit_pending_l1_upgrade(&context)
.await
.with_context(|| "failed to submit pending l1 upgrade")?;

match submit_next_block(&context).await {
match submit_next_block(&context, fee_rate).await {
Ok(nh) => return Ok(nh),
Err(err) => {
if err.is::<ShouldResyncError>() || err.is::<ShouldRevertError>() {
Expand All @@ -288,7 +311,10 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
match confirm_next_block(&context).await {
Ok(nh) => break Ok(nh),
Err(err) => {
if err.is::<ShouldResyncError>() || err.is::<ShouldRevertError>() {
if err.is::<ShouldResyncError>()
|| err.is::<ShouldRevertError>()
|| err.is::<ShouldResetTxError>()
{
bail!(err);
}
log::warn!("failed to confirm next block: {:#}", err);
Expand Down Expand Up @@ -326,6 +352,13 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
.unpack();
bail!(ShouldRevertError(last_confirmed));
}
_ = fee_rate_adjust_interval.tick(), if has_fee_rate_controller => {
let fee_rate_pid = state.fee_rate_controller.as_mut().unwrap();
let output = fee_rate_pid.next_control_output(state.submitted_count as f64 / config.submitted_limit as f64);
let f = (mid_fee_rate as f64 + half_fee_rate_range as f64 * output.output) as u64;
// Just to be safe from possible rounding errors.
state.current_fee_rate = f.clamp(config.min_fee_rate, config.max_fee_rate);
}
// Block confirmed.
result = &mut confirm_handle, if confirming => {
confirming = false;
Expand All @@ -341,6 +374,7 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
publish_confirmed(&mut sync_server, &state.context.store.get_snapshot(), nh.number().unpack())?;
}
state.set_submitted_count(state.submitted_count - 1);
state.context.liveness.tick();
}
_ => {}
}
Expand All @@ -361,6 +395,7 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
}
state.set_local_count(state.local_count - 1);
state.set_submitted_count(state.submitted_count + 1);
state.context.liveness.tick();
}
_ => {}
}
Expand All @@ -373,14 +408,45 @@ async fn run(state: &mut ProduceSubmitConfirm) -> Result<()> {
log::warn!("failed to produce local block: {:#}", e);
} else {
state.set_local_count(state.local_count + 1);
state.context.liveness.tick();
}
}
}
// We have produced, submitted or confirmed a block. Update liveness tick.
state.context.liveness.tick();
}
}

/// Reset last submitted to last confirmed.
async fn reset_submission_txs(state: &mut ProduceSubmitConfirm) -> Result<()> {
let mut store_tx = state.context.store.begin_transaction();
let last_confirmed = store_tx
.get_last_confirmed_block_number_hash()
.context("get last confirmed")?;
let last_confirmed_block_number = last_confirmed.number().unpack();
let last_submitted = store_tx
.get_last_submitted_block_number_hash()
.context("get last submitted")?
.number()
.unpack();
for i in last_confirmed_block_number + 1..last_submitted {
store_tx.delete_submit_tx(i)?;
}
store_tx.set_last_submitted_block_number_hash(&last_confirmed.as_reader())?;
store_tx.commit()?;

state.set_local_count(state.local_count + state.submitted_count);
state.set_submitted_count(0);
state.context.local_cells_manager.lock().await.reset();

// Use random fee rate.
let c = &state.context.psc_config;
let fee_rate = thread_rng().gen_range(c.min_fee_rate..=c.max_fee_rate);
state.current_fee_rate = fee_rate;

log::info!("reset last submitted to #{last_confirmed_block_number}, fee rate to {fee_rate}");

Ok(())
}

/// Produce and save local block.
#[instrument(skip_all)]
async fn produce_local_block(ctx: &PSCContext) -> Result<()> {
Expand Down Expand Up @@ -490,7 +556,7 @@ async fn produce_local_block(ctx: &PSCContext) -> Result<()> {
Ok(())
}

async fn submit_next_block(ctx: &PSCContext) -> Result<NumberHash> {
async fn submit_next_block(ctx: &PSCContext, fee_rate: u64) -> Result<NumberHash> {
let snap = ctx.store.get_snapshot();
// L2 block number to submit.
let block_number = snap
Expand All @@ -508,7 +574,7 @@ async fn submit_next_block(ctx: &PSCContext) -> Result<NumberHash> {
// it does, it means that previous block is probably not confirmed
// anymore, and we should sync with L1 again.
let is_first = block_number == last_confirmed + 1;
submit_block(ctx, snap, is_first, block_number).await
submit_block(ctx, snap, is_first, block_number, fee_rate).await
}

#[instrument(skip(ctx, snap, is_first))]
Expand All @@ -517,6 +583,7 @@ async fn submit_block(
snap: StoreSnapshot,
is_first: bool,
block_number: u64,
fee_rate: u64,
) -> Result<NumberHash> {
// L2 block hash to submit.
let block_hash = snap
Expand Down Expand Up @@ -563,6 +630,7 @@ async fn submit_block(
since,
withdrawal_extras,
local_cells_manager: &*local_cells_manager,
fee_rate,
};
let tx = ctx
.block_producer
Expand Down Expand Up @@ -652,9 +720,14 @@ async fn submit_block(
.build())
}

async fn poll_tx_confirmed(rpc_client: &RPCClient, tx: &Transaction) -> Result<()> {
async fn poll_tx_confirmed(
rpc_client: &RPCClient,
tx: &Transaction,
timeout: Option<Duration>,
) -> Result<()> {
log::info!("waiting for tx 0x{}", hex::encode(tx.hash()));
let mut last_sent = Instant::now();
let initial_instant = Instant::now();
let mut last_sent = initial_instant;
loop {
let status = rpc_client.ckb.get_transaction_status(tx.hash()).await?;
use gw_jsonrpc_types::ckb_jsonrpc_types::Status;
Expand All @@ -674,6 +747,10 @@ async fn poll_tx_confirmed(rpc_client: &RPCClient, tx: &Transaction) -> Result<(
_ => last_sent.elapsed() > Duration::from_secs(24),
};
if should_resend {
if timeout.is_some() && Some(initial_instant.elapsed()) > timeout {
bail!(ConfirmTimeoutError);
}

log::info!("resend transaction 0x{}", hex::encode(tx.hash()));
send_transaction_or_check_inputs(rpc_client, tx).await?;
last_sent = Instant::now();
Expand Down Expand Up @@ -721,13 +798,21 @@ async fn confirm_block(
.get_block_submit_tx(block_number)
.expect("get submit tx");
drop(snap);
poll_tx_confirmed(&context.rpc_client, &tx)
// Use a timeout if there is room for fee rate bumping.
let timeout = if context.psc_config.max_fee_rate > context.psc_config.min_fee_rate {
Some(Duration::from_secs(context.psc_config.confirm_timeout_secs))
} else {
None
};
poll_tx_confirmed(&context.rpc_client, &tx, timeout)
.await
.map_err(|e| {
if e.is::<UnknownCellError>() {
e.context(ShouldResyncError)
} else if e.is::<DeadCellError>() {
e.context(ShouldRevertError(block_number))
} else if e.is::<ConfirmTimeoutError>() {
e.context(ShouldResetTxError)
} else {
e
}
Expand Down Expand Up @@ -810,7 +895,7 @@ async fn confirm_pending_l1_upgrade(ctx: &PSCContext) -> Result<()> {
let tx: Transaction = l1_upgrade.signed_transaction.clone().into();
// We can't recover from errors when confirming l1 upgrade tx,
// both deadcell error and unknwown cell is unacceptable, so we just throw it
poll_tx_confirmed(&ctx.rpc_client, &tx).await?;
poll_tx_confirmed(&ctx.rpc_client, &tx, None).await?;
log::info!("l1 upgrade tx confirmed");
}
Ok(())
Expand Down Expand Up @@ -917,6 +1002,19 @@ impl Display for ShouldResyncError {
}
}

#[derive(Debug)]
struct ShouldResetTxError;

impl Display for ShouldResetTxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "should reset txs")
}
}

#[derive(Debug, thiserror::Error)]
#[error("confirm transaction timeout")]
struct ConfirmTimeoutError;

#[derive(thiserror::Error, Debug)]
struct DeadCellError {
consumed_by_tx: Option<H256>,
Expand Down
1 change: 1 addition & 0 deletions crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ serde = { version = "1.0", features = ["derive"] }
hex = "0.4"
lazy_static = "1.4"
toml = "0.5"
pid = { version = "4.0.0", features = ["serde"] }
Loading