Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prestwich/super pending #566

Merged
merged 15 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
//! # }
//! ```
mod transports;
use futures_util::future::join_all;
pub use transports::*;

mod provider;
Expand All @@ -74,6 +75,9 @@ pub mod ens;
mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand All @@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr};

pub use provider::{FilterKind, Provider, ProviderError};

/// A simple gas escalation policy
pub type EscalationPolicy = Box<dyn Fn(U256, usize) -> U256 + Send + Sync>;

// Helper type alias
#[cfg(target_arch = "wasm32")]
pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
Expand Down Expand Up @@ -283,6 +290,43 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().send_transaction(tx, block).await.map_err(FromErr::from)
}

/// Send a transaction with a simple escalation policy.
///
/// `escalation` should be a boxed function that maps `original_gas_price`
/// and `number_of_previous_escalations` -> `new_gas_price`.
///
/// e.g. `Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))`
async fn send_escalating<'a>(
&'a self,
tx: &TypedTransaction,
escalation: EscalationPolicy,
prestwich marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
let mut original = tx.clone();
self.fill_transaction(&mut original, None).await?;
let gas_price = original.gas_price().expect("filled");
let chain_id = self.get_chainid().await?.low_u64();
let sign_futs: Vec<_> = (0..5)
prestwich marked this conversation as resolved.
Show resolved Hide resolved
.map(|i| {
let new_price = escalation(gas_price, i);
let mut r = original.clone();
r.set_gas_price(new_price);
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
.await
.map(|sig| req.rlp_signed(chain_id, &sig))
})
.collect();

// we reverse for convenience. Ensuring that we can always just
// `pop()` the next tx off the back later
let mut signed = join_all(sign_futs).await.into_iter().collect::<Result<Vec<_>, _>>()?;
signed.reverse();

Ok(EscalatingPending::new(self.provider(), signed))
}

async fn resolve_name(&self, ens_name: &str) -> Result<Address, Self::Error> {
self.inner().resolve_name(ens_name).await.map_err(FromErr::from)
}
Expand Down
221 changes: 221 additions & 0 deletions ethers-providers/src/pending_escalator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use ethers_core::types::{Bytes, TransactionReceipt, H256};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_timer::Delay;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Delay;

use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};

/// States for the EscalatingPending future
enum EscalatorStates<'a, P> {
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
Sleeping(Pin<Box<Delay>>),
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
CheckingReceipts(Vec<PinBoxFut<'a, Option<TransactionReceipt>>>),
Completed,
}

/// An EscalatingPending is a pending transaction that handles increasing its
/// own gas price over time, by broadcasting successive versions with higher
/// gas prices
#[must_use]
#[pin_project(project = PendingProj)]
#[derive(Debug)]
pub struct EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
provider: &'a Provider<P>,
broadcast_interval: Duration,
polling_interval: Duration,
txns: Vec<Bytes>,
last: Instant,
sent: Vec<H256>,
state: EscalatorStates<'a, P>,
}

impl<'a, P> EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
/// Instantiate a new EscalatingPending. This should only be called by the
/// Middleware trait. Callers MUST ensure that transactions are in _reverse_
/// broadcast order (this just makes writing the code easier, as we
/// can use `pop()` a lot)
///
/// TODO: consider deserializing and checking invariants (gas order, etc.)
pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
if txns.is_empty() {
panic!("bad args");
}

let first = txns.pop().expect("bad args");
// Sane-feeling default intervals
Self {
provider,
broadcast_interval: Duration::from_millis(150),
polling_interval: Duration::from_millis(10),
txns,
// placeholder value. We set this again after the initial broadcast
// future resolves
last: Instant::now(),
sent: vec![],
state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
}
}

pub fn broadcast_interval(mut self, duration: u64) -> Self {
self.broadcast_interval = Duration::from_secs(duration);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use milliseconds here, just for flexbility sake

Suggested change
self.broadcast_interval = Duration::from_secs(duration);
self.broadcast_interval = Duration::from_millis(duration);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to change it to use Into<Duration> to match the behavior of PendingTransaction

self
}

pub fn polling_interval(mut self, duration: u64) -> Self {
self.polling_interval = Duration::from_secs(duration);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.polling_interval = Duration::from_secs(duration);
self.polling_interval = Duration::from_millis(duration);

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.polling_interval = Duration::from_secs(duration);
self.polling_interval = Duration::from_millis(duration);

self
}
}

macro_rules! check_all_receipts {
($cx:ident, $this:ident) => {
let futs: Vec<_> = $this
.sent
.iter()
.map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
.collect();
*$this.state = CheckingReceipts(futs);
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! sleep {
($cx:ident, $this:ident) => {
*$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! completed {
($this:ident, $output:expr) => {
*$this.state = Completed;
return Poll::Ready($output)
};
}

macro_rules! poll_broadcast_fut {
($cx:ident, $this:ident, $fut:ident) => {
match $fut.as_mut().poll($cx) {
Poll::Ready(Ok(pending)) => {
*$this.last = Instant::now();
$this.sent.push(*pending);
tracing::info!(
tx_hash = ?*pending,
escalation = $this.sent.len(),
"Escalation transaction broadcast complete"
);
check_all_receipts!($cx, $this);
}
Poll::Ready(Err(e)) => {
tracing::error!(
error = ?e,
"Error during transaction broadcast"
);
completed!($this, Err(e));
}
Poll::Pending => return Poll::Pending,
}
};
}

impl<'a, P> Future for EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use EscalatorStates::*;

let this = self.project();

match this.state {
Initial(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
Sleeping(delay) => {
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
// if broadcast timer has elapsed and if we have a TX to
// broadcast, broadcast it
if this.last.elapsed() > *this.broadcast_interval {
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending
}
}
check_all_receipts!(cx, this);
}
BroadcastingNew(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
CheckingReceipts(futs) => {
// if drained, sleep
if futs.is_empty() {
sleep!(cx, this);
}

// otherwise drain one and check if we have a receipt
let mut pollee = futs.pop().expect("checked");
match pollee.as_mut().poll(cx) {
// we have found a receipt. This means that all other
// broadcast txns are now invalid, so we can drop them
Poll::Ready(Ok(Some(receipt))) => {
completed!(this, Ok(receipt));
}
// we found no receipt, rewake and check the next future
// until drained
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cx.waker().wake_by_ref() necessary? Won't executor automatically put the EscalatingPending future back in queue? Other paths here that return Poll::Pending don't call wake

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the goal is to explicitly notify the executor that this is ready to be polled again immediately. i.e. override the executor's scheduling and jump to the front of the queue as much as we have the ability to, as we know that there's no async operation happening to wait on

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section looks like it could be wrapped in a loop over all futures? So that Poll::Ready(Ok(None)) => merely results in a continue?
Is the order in which they should be polled here important, if not then Vec::swap_remove could be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it kinda is right now, it's just ensuring that it allows the executor to schedule each step of the loop and interleave other work

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping over all futures is an anti-pattern I think, the state machine may be slightly easier to write but as @prestwich says it's better to do it on a per-fut basis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the CheckingReceipts state essentially a FuturesOrdered but we only poll one future at a time?

// bubble up errors
Poll::Ready(Err(e)) => {
completed!(this, Err(e));
}
// check again later
Poll::Pending => {
// stick it pack in the list for polling again later
prestwich marked this conversation as resolved.
Show resolved Hide resolved
futs.push(pollee);
return Poll::Pending
}
}
}
Completed => panic!("polled after completion"),
}

Poll::Pending
}
}

impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self {
Self::Initial(_) => "Initial",
Self::Sleeping(_) => "Sleeping",
Self::BroadcastingNew(_) => "BroadcastingNew",
Self::CheckingReceipts(_) => "CheckingReceipts",
Self::Completed => "Completed",
};
f.debug_struct("EscalatorStates").field("state", &state).finish()
}
}
1 change: 1 addition & 0 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
use crate::CeloMiddleware;
use crate::Middleware;
use async_trait::async_trait;

use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Expand Down