diff --git a/CHANGELOG.md b/CHANGELOG.md index 2abe6bfe4..5f244878d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ - Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482) - Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481) - `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559) +- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568) +- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570) +- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566) ### 0.5.3 diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 009f74fbc..215619d11 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -64,6 +64,7 @@ //! # } //! ``` mod transports; +use futures_util::future::join_all; pub use transports::*; mod provider; @@ -74,6 +75,9 @@ pub mod ens; mod pending_transaction; pub use pending_transaction::PendingTransaction; +mod pending_escalator; +pub use pending_escalator::EscalatingPending; + mod stream; pub use futures_util::StreamExt; pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL}; @@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr}; pub use provider::{FilterKind, Provider, ProviderError}; +/// A simple gas escalation policy +pub type EscalationPolicy = Box U256 + Send + Sync>; + // Helper type alias #[cfg(target_arch = "wasm32")] pub(crate) type PinBoxFut<'a, T> = Pin> + 'a>>; @@ -283,6 +290,45 @@ pub trait Middleware: Sync + Send + Debug { self.inner().send_transaction(tx, block).await.map_err(FromErr::from) } + /// Send a transaction with a simple escalation policy. + /// + /// `policy` should be a boxed function that maps `original_gas_price` + /// and `number_of_previous_escalations` -> `new_gas_price`. + /// + /// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) / + /// 1000.pow(escalations))` + async fn send_escalating<'a>( + &'a self, + tx: &TypedTransaction, + escalations: usize, + policy: EscalationPolicy, + ) -> Result, Self::Error> { + let mut original = tx.clone(); + self.fill_transaction(&mut original, None).await?; + let gas_price = original.gas_price().expect("filled"); + let chain_id = self.get_chainid().await?.low_u64(); + let sign_futs: Vec<_> = (0..escalations) + .map(|i| { + let new_price = policy(gas_price, i); + let mut r = original.clone(); + r.set_gas_price(new_price); + r + }) + .map(|req| async move { + self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default()) + .await + .map(|sig| req.rlp_signed(chain_id, &sig)) + }) + .collect(); + + // we reverse for convenience. Ensuring that we can always just + // `pop()` the next tx off the back later + let mut signed = join_all(sign_futs).await.into_iter().collect::, _>>()?; + signed.reverse(); + + Ok(EscalatingPending::new(self.provider(), signed)) + } + async fn resolve_name(&self, ens_name: &str) -> Result { self.inner().resolve_name(ens_name).await.map_err(FromErr::from) } diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs new file mode 100644 index 000000000..a09fa53ed --- /dev/null +++ b/ethers-providers/src/pending_escalator.rs @@ -0,0 +1,240 @@ +use ethers_core::types::{Bytes, TransactionReceipt, H256}; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::Poll, + time::{Duration, Instant}, +}; + +#[cfg(not(target_arch = "wasm32"))] +use futures_timer::Delay; +#[cfg(target_arch = "wasm32")] +use wasm_timer::Delay; + +use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError}; + +/// States for the EscalatingPending future +enum EscalatorStates<'a, P> { + Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), + Sleeping(Pin>), + BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), + CheckingReceipts(FuturesUnordered>>), + Completed, +} + +/// An EscalatingPending is a pending transaction that increases its own gas +/// price over time, by broadcasting successive versions with higher gas prices. +#[must_use] +#[pin_project(project = PendingProj)] +#[derive(Debug)] +pub struct EscalatingPending<'a, P> +where + P: JsonRpcClient, +{ + provider: &'a Provider

, + broadcast_interval: Duration, + polling_interval: Duration, + txns: Vec, + last: Instant, + sent: Vec, + state: EscalatorStates<'a, P>, +} + +impl<'a, P> EscalatingPending<'a, P> +where + P: JsonRpcClient, +{ + /// Instantiate a new EscalatingPending. This should only be called by the + /// Middleware trait. + /// + /// Callers MUST ensure that transactions are in _reverse_ broadcast order + /// (this just makes writing the code easier, as we can use `pop()` a lot). + /// + /// TODO: consider deserializing and checking invariants (gas order, etc.) + pub(crate) fn new(provider: &'a Provider

, mut txns: Vec) -> Self { + if txns.is_empty() { + panic!("bad args"); + } + + let first = txns.pop().expect("bad args"); + // Sane-feeling default intervals + Self { + provider, + broadcast_interval: Duration::from_millis(150), + polling_interval: Duration::from_millis(10), + txns, + // placeholder value. We set this again after the initial broadcast + // future resolves + last: Instant::now(), + sent: vec![], + state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))), + } + } + + /// Set the broadcast interval. This controls how often the escalator + /// broadcasts a new transaction at a higher gas price + pub fn with_broadcast_interval(mut self, duration: impl Into) -> Self { + self.broadcast_interval = duration.into(); + self + } + + /// Set the polling interval. This controls how often the escalator checks + /// transaction receipts for confirmation. + pub fn with_polling_interval(mut self, duration: impl Into) -> Self { + self.polling_interval = duration.into(); + self + } + + /// Get the current polling interval. + pub fn get_polling_interval(&self) -> Duration { + self.polling_interval + } + + /// Get the current broadcast interval. + pub fn get_broadcast_interval(&self) -> Duration { + self.broadcast_interval + } +} + +macro_rules! check_all_receipts { + ($cx:ident, $this:ident) => { + let futs: futures_util::stream::FuturesUnordered<_> = $this + .sent + .iter() + .map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash)) + .collect(); + *$this.state = CheckingReceipts(futs); + $cx.waker().wake_by_ref(); + return Poll::Pending + }; +} + +macro_rules! sleep { + ($cx:ident, $this:ident) => { + *$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval))); + $cx.waker().wake_by_ref(); + return Poll::Pending + }; +} + +macro_rules! completed { + ($this:ident, $output:expr) => { + *$this.state = Completed; + return Poll::Ready($output) + }; +} + +macro_rules! poll_broadcast_fut { + ($cx:ident, $this:ident, $fut:ident) => { + match $fut.as_mut().poll($cx) { + Poll::Ready(Ok(pending)) => { + *$this.last = Instant::now(); + $this.sent.push(*pending); + tracing::info!( + tx_hash = ?*pending, + escalation = $this.sent.len(), + "Escalation transaction broadcast complete" + ); + check_all_receipts!($cx, $this); + } + Poll::Ready(Err(e)) => { + tracing::error!( + error = ?e, + "Error during transaction broadcast" + ); + completed!($this, Err(e)); + } + Poll::Pending => return Poll::Pending, + } + }; +} + +impl<'a, P> Future for EscalatingPending<'a, P> +where + P: JsonRpcClient, +{ + type Output = Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + use EscalatorStates::*; + + let this = self.project(); + + match this.state { + // In the initial state we're simply waiting on the first + // transaction braodcast to complete. + Initial(fut) => { + poll_broadcast_fut!(cx, this, fut); + } + Sleeping(delay) => { + let _ready = futures_util::ready!(delay.as_mut().poll(cx)); + // if broadcast timer has elapsed and if we have a TX to + // broadcast, broadcast it + if this.last.elapsed() > *this.broadcast_interval { + if let Some(next_to_broadcast) = this.txns.pop() { + let fut = this.provider.send_raw_transaction(next_to_broadcast); + *this.state = BroadcastingNew(fut); + cx.waker().wake_by_ref(); + return Poll::Pending + } + } + check_all_receipts!(cx, this); + } + // This state is functionally equivalent to Initial, but we + // differentiate it for clarity + BroadcastingNew(fut) => { + poll_broadcast_fut!(cx, this, fut); + } + CheckingReceipts(futs) => { + // Poll the set of `get_transaction_receipt` futures to check + // if any previously-broadcast transaction was confirmed. + // Continue doing this until all are resolved + match futs.poll_next_unpin(cx) { + // We have found a receipt. This means that all other + // broadcast txns are now invalid, so we can drop the + // futures and complete + Poll::Ready(Some(Ok(Some(receipt)))) => { + completed!(this, Ok(receipt)); + } + // A `get_transaction_receipt` request resolved, but but we + // found no receipt, rewake and check if any other requests + // are resolved + Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(), + // A request errored. We complete the future with the error. + Poll::Ready(Some(Err(e))) => { + completed!(this, Err(e)); + } + // We have run out of `get_transaction_receipt` requests. + // Sleep and then check if we should broadcast again (or + // check receipts again) + Poll::Ready(None) => { + sleep!(cx, this); + } + // No request has resolved yet. Try again later + Poll::Pending => return Poll::Pending, + } + } + Completed => panic!("polled after completion"), + } + + Poll::Pending + } +} + +impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = match self { + Self::Initial(_) => "Initial", + Self::Sleeping(_) => "Sleeping", + Self::BroadcastingNew(_) => "BroadcastingNew", + Self::CheckingReceipts(_) => "CheckingReceipts", + Self::Completed => "Completed", + }; + f.debug_struct("EscalatorStates").field("state", &state).finish() + } +} diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 1d60845e9..28da37975 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -10,6 +10,7 @@ use crate::{ use crate::CeloMiddleware; use crate::Middleware; use async_trait::async_trait; + use ethers_core::{ abi::{self, Detokenize, ParamType}, types::{