Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Prestwich/super pending #566

Merged
merged 15 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
- Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482)
- Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481)
- `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559)
- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568)
- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570)
- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566)

### 0.5.3

Expand Down
46 changes: 46 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
//! # }
//! ```
mod transports;
use futures_util::future::join_all;
pub use transports::*;

mod provider;
Expand All @@ -74,6 +75,9 @@ pub mod ens;
mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand All @@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr};

pub use provider::{FilterKind, Provider, ProviderError};

/// A simple gas escalation policy
pub type EscalationPolicy = Box<dyn Fn(U256, usize) -> U256 + Send + Sync>;

// Helper type alias
#[cfg(target_arch = "wasm32")]
pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
Expand Down Expand Up @@ -283,6 +290,45 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().send_transaction(tx, block).await.map_err(FromErr::from)
}

/// Send a transaction with a simple escalation policy.
///
/// `policy` should be a boxed function that maps `original_gas_price`
/// and `number_of_previous_escalations` -> `new_gas_price`.
///
/// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) /
/// 1000.pow(escalations))`
async fn send_escalating<'a>(
&'a self,
tx: &TypedTransaction,
escalations: usize,
policy: EscalationPolicy,
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
let mut original = tx.clone();
self.fill_transaction(&mut original, None).await?;
let gas_price = original.gas_price().expect("filled");
let chain_id = self.get_chainid().await?.low_u64();
let sign_futs: Vec<_> = (0..escalations)
.map(|i| {
let new_price = policy(gas_price, i);
let mut r = original.clone();
r.set_gas_price(new_price);
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
.await
.map(|sig| req.rlp_signed(chain_id, &sig))
})
.collect();

// we reverse for convenience. Ensuring that we can always just
// `pop()` the next tx off the back later
let mut signed = join_all(sign_futs).await.into_iter().collect::<Result<Vec<_>, _>>()?;
signed.reverse();

Ok(EscalatingPending::new(self.provider(), signed))
}

async fn resolve_name(&self, ens_name: &str) -> Result<Address, Self::Error> {
self.inner().resolve_name(ens_name).await.map_err(FromErr::from)
}
Expand Down
240 changes: 240 additions & 0 deletions ethers-providers/src/pending_escalator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use ethers_core::types::{Bytes, TransactionReceipt, H256};
use futures_util::{stream::FuturesUnordered, StreamExt};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_timer::Delay;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Delay;

use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};

/// States for the EscalatingPending future
enum EscalatorStates<'a, P> {
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
Sleeping(Pin<Box<Delay>>),
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
Completed,
}

/// An EscalatingPending is a pending transaction that increases its own gas
/// price over time, by broadcasting successive versions with higher gas prices.
#[must_use]
#[pin_project(project = PendingProj)]
#[derive(Debug)]
pub struct EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
provider: &'a Provider<P>,
broadcast_interval: Duration,
polling_interval: Duration,
txns: Vec<Bytes>,
last: Instant,
sent: Vec<H256>,
state: EscalatorStates<'a, P>,
}

impl<'a, P> EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
/// Instantiate a new EscalatingPending. This should only be called by the
/// Middleware trait.
///
/// Callers MUST ensure that transactions are in _reverse_ broadcast order
/// (this just makes writing the code easier, as we can use `pop()` a lot).
///
/// TODO: consider deserializing and checking invariants (gas order, etc.)
pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
if txns.is_empty() {
panic!("bad args");
}

let first = txns.pop().expect("bad args");
// Sane-feeling default intervals
Self {
provider,
broadcast_interval: Duration::from_millis(150),
polling_interval: Duration::from_millis(10),
txns,
// placeholder value. We set this again after the initial broadcast
// future resolves
last: Instant::now(),
sent: vec![],
state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
}
}

/// Set the broadcast interval. This controls how often the escalator
/// broadcasts a new transaction at a higher gas price
pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
self.broadcast_interval = duration.into();
self
}

/// Set the polling interval. This controls how often the escalator checks
/// transaction receipts for confirmation.
pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
self.polling_interval = duration.into();
self
}

/// Get the current polling interval.
pub fn get_polling_interval(&self) -> Duration {
self.polling_interval
}

/// Get the current broadcast interval.
pub fn get_broadcast_interval(&self) -> Duration {
self.broadcast_interval
}
}

macro_rules! check_all_receipts {
($cx:ident, $this:ident) => {
let futs: futures_util::stream::FuturesUnordered<_> = $this
.sent
.iter()
.map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
.collect();
*$this.state = CheckingReceipts(futs);
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! sleep {
($cx:ident, $this:ident) => {
*$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! completed {
($this:ident, $output:expr) => {
*$this.state = Completed;
return Poll::Ready($output)
};
}

macro_rules! poll_broadcast_fut {
($cx:ident, $this:ident, $fut:ident) => {
match $fut.as_mut().poll($cx) {
Poll::Ready(Ok(pending)) => {
*$this.last = Instant::now();
$this.sent.push(*pending);
tracing::info!(
tx_hash = ?*pending,
escalation = $this.sent.len(),
"Escalation transaction broadcast complete"
);
check_all_receipts!($cx, $this);
}
Poll::Ready(Err(e)) => {
tracing::error!(
error = ?e,
"Error during transaction broadcast"
);
completed!($this, Err(e));
}
Poll::Pending => return Poll::Pending,
}
};
}

impl<'a, P> Future for EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use EscalatorStates::*;

let this = self.project();

match this.state {
// In the initial state we're simply waiting on the first
// transaction braodcast to complete.
Initial(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
Sleeping(delay) => {
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
// if broadcast timer has elapsed and if we have a TX to
// broadcast, broadcast it
if this.last.elapsed() > *this.broadcast_interval {
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending
}
}
check_all_receipts!(cx, this);
}
// This state is functionally equivalent to Initial, but we
// differentiate it for clarity
BroadcastingNew(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
CheckingReceipts(futs) => {
// Poll the set of `get_transaction_receipt` futures to check
// if any previously-broadcast transaction was confirmed.
// Continue doing this until all are resolved
match futs.poll_next_unpin(cx) {
// We have found a receipt. This means that all other
// broadcast txns are now invalid, so we can drop the
// futures and complete
Poll::Ready(Some(Ok(Some(receipt)))) => {
completed!(this, Ok(receipt));
}
// A `get_transaction_receipt` request resolved, but but we
// found no receipt, rewake and check if any other requests
// are resolved
Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
// A request errored. We complete the future with the error.
Poll::Ready(Some(Err(e))) => {
completed!(this, Err(e));
}
// We have run out of `get_transaction_receipt` requests.
// Sleep and then check if we should broadcast again (or
// check receipts again)
Poll::Ready(None) => {
sleep!(cx, this);
}
// No request has resolved yet. Try again later
Poll::Pending => return Poll::Pending,
}
}
Completed => panic!("polled after completion"),
}

Poll::Pending
}
}

impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self {
Self::Initial(_) => "Initial",
Self::Sleeping(_) => "Sleeping",
Self::BroadcastingNew(_) => "BroadcastingNew",
Self::CheckingReceipts(_) => "CheckingReceipts",
Self::Completed => "Completed",
};
f.debug_struct("EscalatorStates").field("state", &state).finish()
}
}
1 change: 1 addition & 0 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
use crate::CeloMiddleware;
use crate::Middleware;
use async_trait::async_trait;

use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Expand Down