Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Prestwich/super pending #566

Merged
merged 15 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
- Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482)
- Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481)
- `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559)
- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568)
- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570)
- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566)

### 0.5.3

Expand Down
45 changes: 45 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
//! # }
//! ```
mod transports;
use futures_util::future::join_all;
pub use transports::*;

mod provider;
Expand All @@ -74,6 +75,9 @@ pub mod ens;
mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand All @@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr};

pub use provider::{FilterKind, Provider, ProviderError};

/// A simple gas escalation policy
pub type EscalationPolicy = Box<dyn Fn(U256, usize) -> U256 + Send + Sync>;

// Helper type alias
#[cfg(target_arch = "wasm32")]
pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
Expand Down Expand Up @@ -283,6 +290,44 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().send_transaction(tx, block).await.map_err(FromErr::from)
}

/// Send a transaction with a simple escalation policy.
///
/// `escalation` should be a boxed function that maps `original_gas_price`
/// and `number_of_previous_escalations` -> `new_gas_price`.
///
/// e.g. `Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))`
async fn send_escalating<'a>(
&'a self,
tx: &TypedTransaction,
escalations: usize,
policy: EscalationPolicy,
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
let mut original = tx.clone();
self.fill_transaction(&mut original, None).await?;
let gas_price = original.gas_price().expect("filled");
let chain_id = self.get_chainid().await?.low_u64();
let sign_futs: Vec<_> = (0..escalations)
.map(|i| {
let new_price = policy(gas_price, i);
let mut r = original.clone();
r.set_gas_price(new_price);
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
.await
.map(|sig| req.rlp_signed(chain_id, &sig))
})
.collect();

// we reverse for convenience. Ensuring that we can always just
// `pop()` the next tx off the back later
let mut signed = join_all(sign_futs).await.into_iter().collect::<Result<Vec<_>, _>>()?;
signed.reverse();

Ok(EscalatingPending::new(self.provider(), signed))
}

async fn resolve_name(&self, ens_name: &str) -> Result<Address, Self::Error> {
self.inner().resolve_name(ens_name).await.map_err(FromErr::from)
}
Expand Down
226 changes: 226 additions & 0 deletions ethers-providers/src/pending_escalator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use ethers_core::types::{Bytes, TransactionReceipt, H256};
use futures_util::{stream::FuturesUnordered, StreamExt};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_timer::Delay;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Delay;

use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};

/// States for the EscalatingPending future
enum EscalatorStates<'a, P> {
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
Sleeping(Pin<Box<Delay>>),
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
Completed,
}

/// An EscalatingPending is a pending transaction that handles increasing its
/// own gas price over time, by broadcasting successive versions with higher
/// gas prices
#[must_use]
#[pin_project(project = PendingProj)]
#[derive(Debug)]
pub struct EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
provider: &'a Provider<P>,
broadcast_interval: Duration,
polling_interval: Duration,
txns: Vec<Bytes>,
last: Instant,
sent: Vec<H256>,
state: EscalatorStates<'a, P>,
}

impl<'a, P> EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
/// Instantiate a new EscalatingPending. This should only be called by the
/// Middleware trait. Callers MUST ensure that transactions are in _reverse_
/// broadcast order (this just makes writing the code easier, as we
/// can use `pop()` a lot)
///
/// TODO: consider deserializing and checking invariants (gas order, etc.)
pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
if txns.is_empty() {
panic!("bad args");
}

let first = txns.pop().expect("bad args");
// Sane-feeling default intervals
Self {
provider,
broadcast_interval: Duration::from_millis(150),
polling_interval: Duration::from_millis(10),
txns,
// placeholder value. We set this again after the initial broadcast
// future resolves
last: Instant::now(),
sent: vec![],
state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
}
}

pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
self.broadcast_interval = duration.into();
self
}

pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
self.polling_interval = duration.into();
self
}

pub fn get_polling_interval(&self) -> Duration {
self.polling_interval
}

pub fn get_broadcast_interval(&self) -> Duration {
self.broadcast_interval
}
}

macro_rules! check_all_receipts {
($cx:ident, $this:ident) => {
let futs: futures_util::stream::FuturesUnordered<_> = $this
.sent
.iter()
.map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
.collect();
*$this.state = CheckingReceipts(futs);
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! sleep {
($cx:ident, $this:ident) => {
*$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! completed {
($this:ident, $output:expr) => {
*$this.state = Completed;
return Poll::Ready($output)
};
}

macro_rules! poll_broadcast_fut {
($cx:ident, $this:ident, $fut:ident) => {
match $fut.as_mut().poll($cx) {
Poll::Ready(Ok(pending)) => {
*$this.last = Instant::now();
$this.sent.push(*pending);
tracing::info!(
tx_hash = ?*pending,
escalation = $this.sent.len(),
"Escalation transaction broadcast complete"
);
check_all_receipts!($cx, $this);
}
Poll::Ready(Err(e)) => {
tracing::error!(
error = ?e,
"Error during transaction broadcast"
);
completed!($this, Err(e));
}
Poll::Pending => return Poll::Pending,
}
};
}

impl<'a, P> Future for EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use EscalatorStates::*;

let this = self.project();

match this.state {
Initial(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
Sleeping(delay) => {
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
// if broadcast timer has elapsed and if we have a TX to
// broadcast, broadcast it
if this.last.elapsed() > *this.broadcast_interval {
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending
}
}
check_all_receipts!(cx, this);
}
BroadcastingNew(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
CheckingReceipts(futs) => {
// otherwise drain one and check if we have a receipt
match futs.poll_next_unpin(cx) {
// we have found a receipt. This means that all other
// broadcast txns are now invalid, so we can drop them
Poll::Ready(Some(Ok(Some(receipt)))) => {
completed!(this, Ok(receipt));
}
// we found no receipt, rewake and check the next future
// until drained
Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
// bubble up errors
Poll::Ready(Some(Err(e))) => {
completed!(this, Err(e));
}
Poll::Ready(None) => {
sleep!(cx, this);
}
// check again later
Poll::Pending => {
// stick it pack in the list for polling again later
prestwich marked this conversation as resolved.
Show resolved Hide resolved
return Poll::Pending
}
}
}
Completed => panic!("polled after completion"),
}

Poll::Pending
}
}

impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self {
Self::Initial(_) => "Initial",
Self::Sleeping(_) => "Sleeping",
Self::BroadcastingNew(_) => "BroadcastingNew",
Self::CheckingReceipts(_) => "CheckingReceipts",
Self::Completed => "Completed",
};
f.debug_struct("EscalatorStates").field("state", &state).finish()
}
}
1 change: 1 addition & 0 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
use crate::CeloMiddleware;
use crate::Middleware;
use async_trait::async_trait;

use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Expand Down