From e3a3713747cf852117d9b7910a67ae11ef2ccf4f Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 08:31:17 -0800 Subject: [PATCH 01/15] feature: pending_escalator --- ethers-providers/src/pending_escalator.rs | 181 ++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 ethers-providers/src/pending_escalator.rs diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs new file mode 100644 index 000000000..3b69a7a00 --- /dev/null +++ b/ethers-providers/src/pending_escalator.rs @@ -0,0 +1,181 @@ +use ethers_core::types::{Bytes, TransactionReceipt, H256}; +use pin_project::pin_project; +use std::{ + collections::HashMap, + future::{self, Future}, + pin::Pin, + task::Poll, + time::{Duration, Instant}, +}; +use tokio::{sync::RwLock, time::Sleep}; + +use crate::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError}; + +type LockedMap = RwLock>>; + +/// tmp +#[pin_project(project = PendingProj)] +pub struct SuperPending<'a, P> +where + P: JsonRpcClient, +{ + provider: &'a Provider

, + broadcast_interval: Duration, + polling_interval: Duration, + txns: Vec, + last: Option, + sent: Vec, + state: PendingStates<'a, P>, +} + +impl<'a, P> SuperPending<'a, P> +where + P: JsonRpcClient, +{ + /// Instantiate a new SuperPending + pub(crate) fn new( + provider: &'a Provider

, + broadcast_interval: u64, + polling_interval: u64, + mut txns: Vec, + ) -> Self { + if txns.is_empty() { + panic!("bad args"); + } + + let first = txns.pop().expect("bad args"); + Self { + provider, + broadcast_interval: Duration::from_millis(broadcast_interval), + polling_interval: Duration::from_millis(polling_interval), + txns, + last: None, + sent: vec![], + state: PendingStates::Initial(provider.send_raw_transaction(first)), + } + } +} + +type PinBoxFut<'a, T> = Pin + 'a + Send>>; + +enum PendingStates<'a, P> { + Initial(PinBoxFut<'a, Result, ProviderError>>), + Sleeping(Pin>), + BroadcastingNew(PinBoxFut<'a, Result, ProviderError>>), + CheckingReceipts(Vec, ProviderError>>>), + Completed, +} + +macro_rules! check_all_receipts { + ($cx:ident, $this:ident) => { + let futs: Vec<_> = $this + .sent + .iter() + .map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash)) + .collect(); + *$this.state = CheckingReceipts(futs); + $cx.waker().wake_by_ref(); + return Poll::Pending; + }; +} + +macro_rules! sleep { + ($cx:ident, $this:ident) => { + *$this.state = + PendingStates::Sleeping(Box::pin(tokio::time::sleep(*$this.polling_interval))); + $cx.waker().wake_by_ref(); + return Poll::Pending; + }; +} + +macro_rules! completed { + ($this:ident, $output:expr) => { + *$this.state = Completed; + return Poll::Ready($output); + }; +} + +macro_rules! broadcast_checks { + ($cx:ident, $this:ident, $fut:ident) => { + match $fut.as_mut().poll($cx) { + Poll::Ready(Ok(pending)) => { + $this.sent.push(*pending); + check_all_receipts!($cx, $this); + } + Poll::Ready(Err(e)) => { + completed!($this, Err(e)); + } + Poll::Pending => return Poll::Pending, + } + }; +} + +impl<'a, P> Future for SuperPending<'a, P> +where + P: JsonRpcClient, +{ + type Output = Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + use PendingStates::*; + + let this = self.project(); + + match this.state { + Initial(fut) => { + broadcast_checks!(cx, this, fut); + } + Sleeping(fut) => { + if fut.as_mut().poll(cx).is_ready() { + // if timer has elapsed (or this is the first tx) + if this.last.is_none() + || this.last.clone().unwrap().elapsed() > *this.broadcast_interval + { + // then if we have a TX to broadcast, start + // broadcasting it + if let Some(next_to_broadcast) = this.txns.pop() { + let fut = this.provider.send_raw_transaction(next_to_broadcast); + *this.state = BroadcastingNew(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; + } + } + + check_all_receipts!(cx, this); + } + + return Poll::Pending; + } + BroadcastingNew(fut) => { + broadcast_checks!(cx, this, fut); + } + CheckingReceipts(futs) => { + // if drained, sleep + if futs.is_empty() { + sleep!(cx, this); + } + + // otherwise drain one and check if we have a receipt + match futs.pop().expect("checked").as_mut().poll(cx) { + // + Poll::Ready(Ok(Some(receipt))) => { + completed!(this, Ok(receipt)); + } + // rewake until drained + Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), + // bubble up errors + Poll::Ready(Err(e)) => { + completed!(this, Err(e)); + } + Poll::Pending => return Poll::Pending, + } + } + Completed => panic!("polled after completion"), + } + + Poll::Pending + } +} From 8dd1a18bcf683acd3aa9d2524e3710277c70f26d Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 09:36:03 -0800 Subject: [PATCH 02/15] feature: send_escalating in Middleware --- ethers-providers/src/lib.rs | 44 +++++++++++++++++ ethers-providers/src/pending_escalator.rs | 59 ++++++++++++++--------- ethers-providers/src/provider.rs | 3 +- 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 009f74fbc..cc6fa3f8a 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -64,6 +64,7 @@ //! # } //! ``` mod transports; +use futures_util::future::join_all; pub use transports::*; mod provider; @@ -74,6 +75,9 @@ pub mod ens; mod pending_transaction; pub use pending_transaction::PendingTransaction; +mod pending_escalator; +pub use pending_escalator::EscalatingPending; + mod stream; pub use futures_util::StreamExt; pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL}; @@ -89,6 +93,8 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr}; pub use provider::{FilterKind, Provider, ProviderError}; +pub type EscalationPolicy = Box U256 + Send + Sync>; + // Helper type alias #[cfg(target_arch = "wasm32")] pub(crate) type PinBoxFut<'a, T> = Pin> + 'a>>; @@ -283,6 +289,44 @@ pub trait Middleware: Sync + Send + Debug { self.inner().send_transaction(tx, block).await.map_err(FromErr::from) } + /// Send a transaction with a simple escalation policy. + /// + /// `escalation` should be a boxed function that maps `original_gas_price` + /// and `number_of_previous_escalations` -> `new_gas_price` + /// + /// e.g. ```Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))``` + /// + async fn send_escalating<'a>( + &'a self, + tx: &TypedTransaction, + from: &Address, + escalation: EscalationPolicy, + ) -> Result, Self::Error> { + let mut original = tx.clone(); + self.fill_transaction(&mut original, None).await?; + let gas_price = original.gas_price().expect("filled"); + let chain_id = self.get_chainid().await?.low_u64(); + let reqs: Vec<_> = (0..5) + .map(|i| { + let new_price = escalation(gas_price, i); + let mut r = original.clone(); + r.set_gas_price(new_price); + r + }) + .collect(); + + let sign_futs = reqs.into_iter().map(|req| async move { + self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) + }); + + // we reverse for convenience. Ensuring that we can always just + // `pop()` the next tx off the back later + let mut signed = join_all(sign_futs).await.into_iter().collect::, _>>()?; + signed.reverse(); + + Ok(EscalatingPending::new(self.provider(), signed)) + } + async fn resolve_name(&self, ens_name: &str) -> Result { self.inner().resolve_name(ens_name).await.map_err(FromErr::from) } diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 3b69a7a00..82b8b2140 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -1,21 +1,31 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256}; use pin_project::pin_project; use std::{ - collections::HashMap, future::{self, Future}, pin::Pin, task::Poll, time::{Duration, Instant}, }; -use tokio::{sync::RwLock, time::Sleep}; +use tokio::time::Sleep; use crate::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError}; -type LockedMap = RwLock>>; +type PinBoxFut<'a, T> = Pin + 'a + Send>>; + +/// States for the EscalatingPending future +enum PendingStates<'a, P> { + Initial(PinBoxFut<'a, Result, ProviderError>>), + Sleeping(Pin>), + BroadcastingNew(PinBoxFut<'a, Result, ProviderError>>), + CheckingReceipts(Vec, ProviderError>>>), + Completed, +} -/// tmp +/// An EscalatingPending is a pending transaction that handles increasing its +/// own gas price over time, by broadcasting successive versions with higher +/// gas prices #[pin_project(project = PendingProj)] -pub struct SuperPending<'a, P> +pub struct EscalatingPending<'a, P> where P: JsonRpcClient, { @@ -28,42 +38,43 @@ where state: PendingStates<'a, P>, } -impl<'a, P> SuperPending<'a, P> +impl<'a, P> EscalatingPending<'a, P> where P: JsonRpcClient, { - /// Instantiate a new SuperPending - pub(crate) fn new( - provider: &'a Provider

, - broadcast_interval: u64, - polling_interval: u64, - mut txns: Vec, - ) -> Self { + /// Instantiate a new EscalatingPending. This should only be called by the + /// Middleware trait. Callers MUST ensure that transactions are in _reverse_ + /// broadcast order (this just makes writing the code easier, as we + /// can use `pop()` a lot) + /// + /// TODO: consider deserializing and checking invariants (gas order, etc.) + pub(crate) fn new(provider: &'a Provider

, mut txns: Vec) -> Self { if txns.is_empty() { panic!("bad args"); } let first = txns.pop().expect("bad args"); + // Sane-feeling default intervals Self { provider, - broadcast_interval: Duration::from_millis(broadcast_interval), - polling_interval: Duration::from_millis(polling_interval), + broadcast_interval: Duration::from_millis(150), + polling_interval: Duration::from_millis(10), txns, last: None, sent: vec![], state: PendingStates::Initial(provider.send_raw_transaction(first)), } } -} -type PinBoxFut<'a, T> = Pin + 'a + Send>>; + pub fn broadcast_interval(mut self, duration: u64) -> Self { + self.broadcast_interval = Duration::from_secs(duration); + self + } -enum PendingStates<'a, P> { - Initial(PinBoxFut<'a, Result, ProviderError>>), - Sleeping(Pin>), - BroadcastingNew(PinBoxFut<'a, Result, ProviderError>>), - CheckingReceipts(Vec, ProviderError>>>), - Completed, + pub fn polling_interval(mut self, duration: u64) -> Self { + self.polling_interval = Duration::from_secs(duration); + self + } } macro_rules! check_all_receipts { @@ -110,7 +121,7 @@ macro_rules! broadcast_checks { }; } -impl<'a, P> Future for SuperPending<'a, P> +impl<'a, P> Future for EscalatingPending<'a, P> where P: JsonRpcClient, { diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 1d60845e9..218f287a8 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -10,6 +10,7 @@ use crate::{ use crate::CeloMiddleware; use crate::Middleware; use async_trait::async_trait; + use ethers_core::{ abi::{self, Detokenize, ParamType}, types::{ @@ -870,7 +871,7 @@ impl Provider

{ let resolver_address: Address = decode_bytes(ParamType::Address, data); if resolver_address == Address::zero() { - return Err(ProviderError::EnsError(ens_name.to_owned())) + return Err(ProviderError::EnsError(ens_name.to_owned())); } // resolve From 533b59d24cee0394d18eeb4fbe5cca073a9e7ad2 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 09:38:04 -0800 Subject: [PATCH 03/15] bug: don't drop unready futures in escalator.poll --- ethers-providers/src/pending_escalator.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 82b8b2140..2487c3104 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -170,7 +170,8 @@ where } // otherwise drain one and check if we have a receipt - match futs.pop().expect("checked").as_mut().poll(cx) { + let mut pollee = futs.pop().expect("checked"); + match pollee.as_mut().poll(cx) { // Poll::Ready(Ok(Some(receipt))) => { completed!(this, Ok(receipt)); @@ -181,7 +182,11 @@ where Poll::Ready(Err(e)) => { completed!(this, Err(e)); } - Poll::Pending => return Poll::Pending, + Poll::Pending => { + // stick it pack in the list for polling again later + futs.push(pollee); + return Poll::Pending; + } } } Completed => panic!("polled after completion"), From 0e3318abb826e743bd13a813e053a73c0bfbe712 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 09:56:17 -0800 Subject: [PATCH 04/15] chore: docs and must_use --- ethers-providers/src/lib.rs | 5 +++-- ethers-providers/src/pending_escalator.rs | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index cc6fa3f8a..b3f78b7ad 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -93,6 +93,7 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr}; pub use provider::{FilterKind, Provider, ProviderError}; +/// A simple gas escalation policy pub type EscalationPolicy = Box U256 + Send + Sync>; // Helper type alias @@ -292,9 +293,9 @@ pub trait Middleware: Sync + Send + Debug { /// Send a transaction with a simple escalation policy. /// /// `escalation` should be a boxed function that maps `original_gas_price` - /// and `number_of_previous_escalations` -> `new_gas_price` + /// and `number_of_previous_escalations` -> `new_gas_price`. /// - /// e.g. ```Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))``` + /// e.g. `Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))` /// async fn send_escalating<'a>( &'a self, diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 2487c3104..93c1c0459 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -24,6 +24,7 @@ enum PendingStates<'a, P> { /// An EscalatingPending is a pending transaction that handles increasing its /// own gas price over time, by broadcasting successive versions with higher /// gas prices +#[must_use] #[pin_project(project = PendingProj)] pub struct EscalatingPending<'a, P> where From 29eb74c186346869c6d0416332dac0d36fe9653f Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 11:41:04 -0800 Subject: [PATCH 05/15] chores: lints, clippies, wasm fixes, dedup pinboxfut --- ethers-providers/src/lib.rs | 10 +++---- ethers-providers/src/pending_escalator.rs | 35 ++++++++++------------- ethers-providers/src/provider.rs | 2 +- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index b3f78b7ad..f1b01bfa7 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -296,7 +296,6 @@ pub trait Middleware: Sync + Send + Debug { /// and `number_of_previous_escalations` -> `new_gas_price`. /// /// e.g. `Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))` - /// async fn send_escalating<'a>( &'a self, tx: &TypedTransaction, @@ -307,19 +306,18 @@ pub trait Middleware: Sync + Send + Debug { self.fill_transaction(&mut original, None).await?; let gas_price = original.gas_price().expect("filled"); let chain_id = self.get_chainid().await?.low_u64(); - let reqs: Vec<_> = (0..5) + let sign_futs: Vec<_> = (0..5) .map(|i| { let new_price = escalation(gas_price, i); let mut r = original.clone(); r.set_gas_price(new_price); r }) + .map(|req| async move { + self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) + }) .collect(); - let sign_futs = reqs.into_iter().map(|req| async move { - self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) - }); - // we reverse for convenience. Ensuring that we can always just // `pop()` the next tx off the back later let mut signed = join_all(sign_futs).await.into_iter().collect::, _>>()?; diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 93c1c0459..8d7e913f6 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -1,23 +1,19 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256}; use pin_project::pin_project; use std::{ - future::{self, Future}, - pin::Pin, + future::Future, task::Poll, time::{Duration, Instant}, }; -use tokio::time::Sleep; -use crate::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError}; - -type PinBoxFut<'a, T> = Pin + 'a + Send>>; +use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError}; /// States for the EscalatingPending future enum PendingStates<'a, P> { - Initial(PinBoxFut<'a, Result, ProviderError>>), - Sleeping(Pin>), - BroadcastingNew(PinBoxFut<'a, Result, ProviderError>>), - CheckingReceipts(Vec, ProviderError>>>), + Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), + Sleeping(Instant), + BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), + CheckingReceipts(Vec>>), Completed, } @@ -63,7 +59,7 @@ where txns, last: None, sent: vec![], - state: PendingStates::Initial(provider.send_raw_transaction(first)), + state: PendingStates::Initial(Box::pin(provider.send_raw_transaction(first))), } } @@ -87,23 +83,22 @@ macro_rules! check_all_receipts { .collect(); *$this.state = CheckingReceipts(futs); $cx.waker().wake_by_ref(); - return Poll::Pending; + return Poll::Pending }; } macro_rules! sleep { ($cx:ident, $this:ident) => { - *$this.state = - PendingStates::Sleeping(Box::pin(tokio::time::sleep(*$this.polling_interval))); + *$this.state = PendingStates::Sleeping(std::time::Instant::now()); $cx.waker().wake_by_ref(); - return Poll::Pending; + return Poll::Pending }; } macro_rules! completed { ($this:ident, $output:expr) => { *$this.state = Completed; - return Poll::Ready($output); + return Poll::Ready($output) }; } @@ -140,11 +135,11 @@ where Initial(fut) => { broadcast_checks!(cx, this, fut); } - Sleeping(fut) => { - if fut.as_mut().poll(cx).is_ready() { + Sleeping(instant) => { + if instant.elapsed() > *this.polling_interval { // if timer has elapsed (or this is the first tx) if this.last.is_none() - || this.last.clone().unwrap().elapsed() > *this.broadcast_interval + || (*this.last).unwrap().elapsed() > *this.broadcast_interval { // then if we have a TX to broadcast, start // broadcasting it @@ -179,7 +174,7 @@ where } // rewake until drained Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), - // bubble up errors + // bubble up errorsc Poll::Ready(Err(e)) => { completed!(this, Err(e)); } diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 218f287a8..28da37975 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -871,7 +871,7 @@ impl Provider

{ let resolver_address: Address = decode_bytes(ParamType::Address, data); if resolver_address == Address::zero() { - return Err(ProviderError::EnsError(ens_name.to_owned())); + return Err(ProviderError::EnsError(ens_name.to_owned())) } // resolve From 44fb4ce8b3a39a3dee2f55627407302c09d0ae10 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 8 Nov 2021 11:49:54 -0800 Subject: [PATCH 06/15] chore: more lints --- ethers-providers/src/pending_escalator.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 8d7e913f6..a9fcf7bbd 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -138,8 +138,8 @@ where Sleeping(instant) => { if instant.elapsed() > *this.polling_interval { // if timer has elapsed (or this is the first tx) - if this.last.is_none() - || (*this.last).unwrap().elapsed() > *this.broadcast_interval + if this.last.is_none() || + (*this.last).unwrap().elapsed() > *this.broadcast_interval { // then if we have a TX to broadcast, start // broadcasting it @@ -147,14 +147,14 @@ where let fut = this.provider.send_raw_transaction(next_to_broadcast); *this.state = BroadcastingNew(fut); cx.waker().wake_by_ref(); - return Poll::Pending; + return Poll::Pending } } check_all_receipts!(cx, this); } - return Poll::Pending; + return Poll::Pending } BroadcastingNew(fut) => { broadcast_checks!(cx, this, fut); @@ -181,7 +181,7 @@ where Poll::Pending => { // stick it pack in the list for polling again later futs.push(pollee); - return Poll::Pending; + return Poll::Pending } } } From a832b48bcae5fd9d027d8d84bb13601e0f44b4c4 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 9 Nov 2021 09:07:55 -0800 Subject: [PATCH 07/15] refactor: use Delay in polling interval to ensure re-waking --- ethers-providers/src/lib.rs | 5 +-- ethers-providers/src/pending_escalator.rs | 43 ++++++++++++----------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index f1b01bfa7..34e97aa96 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -299,7 +299,6 @@ pub trait Middleware: Sync + Send + Debug { async fn send_escalating<'a>( &'a self, tx: &TypedTransaction, - from: &Address, escalation: EscalationPolicy, ) -> Result, Self::Error> { let mut original = tx.clone(); @@ -314,7 +313,9 @@ pub trait Middleware: Sync + Send + Debug { r }) .map(|req| async move { - self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) + self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default()) + .await + .map(|sig| req.rlp_signed(chain_id, &sig)) }) .collect(); diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index a9fcf7bbd..2e5f5657c 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -2,16 +2,22 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256}; use pin_project::pin_project; use std::{ future::Future, + pin::Pin, task::Poll, time::{Duration, Instant}, }; +#[cfg(not(target_arch = "wasm32"))] +use futures_timer::Delay; +#[cfg(target_arch = "wasm32")] +use wasm_timer::Delay; + use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError}; /// States for the EscalatingPending future enum PendingStates<'a, P> { Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), - Sleeping(Instant), + Sleeping(Pin>), BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), CheckingReceipts(Vec>>), Completed, @@ -89,7 +95,7 @@ macro_rules! check_all_receipts { macro_rules! sleep { ($cx:ident, $this:ident) => { - *$this.state = PendingStates::Sleeping(std::time::Instant::now()); + *$this.state = PendingStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval))); $cx.waker().wake_by_ref(); return Poll::Pending }; @@ -135,26 +141,21 @@ where Initial(fut) => { broadcast_checks!(cx, this, fut); } - Sleeping(instant) => { - if instant.elapsed() > *this.polling_interval { - // if timer has elapsed (or this is the first tx) - if this.last.is_none() || - (*this.last).unwrap().elapsed() > *this.broadcast_interval - { - // then if we have a TX to broadcast, start - // broadcasting it - if let Some(next_to_broadcast) = this.txns.pop() { - let fut = this.provider.send_raw_transaction(next_to_broadcast); - *this.state = BroadcastingNew(fut); - cx.waker().wake_by_ref(); - return Poll::Pending - } + Sleeping(delay) => { + let _ready = futures_util::ready!(delay.as_mut().poll(cx)); + // if timer has elapsed (or this is the first tx) + if this.last.is_none() || (*this.last).unwrap().elapsed() > *this.broadcast_interval + { + // then if we have a TX to broadcast, start + // broadcasting it + if let Some(next_to_broadcast) = this.txns.pop() { + let fut = this.provider.send_raw_transaction(next_to_broadcast); + *this.state = BroadcastingNew(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; } - - check_all_receipts!(cx, this); } - - return Poll::Pending + check_all_receipts!(cx, this); } BroadcastingNew(fut) => { broadcast_checks!(cx, this, fut); @@ -181,7 +182,7 @@ where Poll::Pending => { // stick it pack in the list for polling again later futs.push(pollee); - return Poll::Pending + return Poll::Pending; } } } From 9d6be1784f0ad28ccff30593ebb200f73e41a707 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 9 Nov 2021 09:38:23 -0800 Subject: [PATCH 08/15] refactor: simplify Sleeping state transition as last will never be None again --- ethers-providers/src/pending_escalator.rs | 36 +++++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 2e5f5657c..c919cd7e3 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -36,7 +36,7 @@ where broadcast_interval: Duration, polling_interval: Duration, txns: Vec, - last: Option, + last: Instant, sent: Vec, state: PendingStates<'a, P>, } @@ -63,7 +63,7 @@ where broadcast_interval: Duration::from_millis(150), polling_interval: Duration::from_millis(10), txns, - last: None, + last: Instant::now(), sent: vec![], state: PendingStates::Initial(Box::pin(provider.send_raw_transaction(first))), } @@ -108,14 +108,23 @@ macro_rules! completed { }; } -macro_rules! broadcast_checks { +macro_rules! poll_broadcast_fut { ($cx:ident, $this:ident, $fut:ident) => { match $fut.as_mut().poll($cx) { Poll::Ready(Ok(pending)) => { $this.sent.push(*pending); + tracing::info!( + tx_hash = ?*pending, + escalation = $this.sent.len(), + "Escalation transaction broadcast complete" + ); check_all_receipts!($cx, $this); } Poll::Ready(Err(e)) => { + tracing::error!( + error = ?e, + "Error during transaction broadcast" + ); completed!($this, Err(e)); } Poll::Pending => return Poll::Pending, @@ -139,15 +148,13 @@ where match this.state { Initial(fut) => { - broadcast_checks!(cx, this, fut); + poll_broadcast_fut!(cx, this, fut); } Sleeping(delay) => { let _ready = futures_util::ready!(delay.as_mut().poll(cx)); - // if timer has elapsed (or this is the first tx) - if this.last.is_none() || (*this.last).unwrap().elapsed() > *this.broadcast_interval - { - // then if we have a TX to broadcast, start - // broadcasting it + // if broadcast timer has elapsed and if we have a TX to + // broadcast, broadcast it + if this.last.elapsed() > *this.broadcast_interval { if let Some(next_to_broadcast) = this.txns.pop() { let fut = this.provider.send_raw_transaction(next_to_broadcast); *this.state = BroadcastingNew(fut); @@ -158,7 +165,7 @@ where check_all_receipts!(cx, this); } BroadcastingNew(fut) => { - broadcast_checks!(cx, this, fut); + poll_broadcast_fut!(cx, this, fut); } CheckingReceipts(futs) => { // if drained, sleep @@ -169,16 +176,19 @@ where // otherwise drain one and check if we have a receipt let mut pollee = futs.pop().expect("checked"); match pollee.as_mut().poll(cx) { - // + // we have found a receipt. This means that all other + // broadcast txns are now invalid, so we can drop them Poll::Ready(Ok(Some(receipt))) => { completed!(this, Ok(receipt)); } - // rewake until drained + // we found no receipt, rewake and check the next future + // until drained Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), - // bubble up errorsc + // bubble up errors Poll::Ready(Err(e)) => { completed!(this, Err(e)); } + // check again later Poll::Pending => { // stick it pack in the list for polling again later futs.push(pollee); From d97047113833e9f14b2a70bf4f57d4f8a472de27 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 9 Nov 2021 09:45:38 -0800 Subject: [PATCH 09/15] bug: properly set last when broadcasts resolve --- ethers-providers/src/pending_escalator.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index c919cd7e3..5f42b12a8 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -63,6 +63,8 @@ where broadcast_interval: Duration::from_millis(150), polling_interval: Duration::from_millis(10), txns, + // placeholder value. We set this again after the initial broadcast + // future resolves last: Instant::now(), sent: vec![], state: PendingStates::Initial(Box::pin(provider.send_raw_transaction(first))), @@ -112,6 +114,7 @@ macro_rules! poll_broadcast_fut { ($cx:ident, $this:ident, $fut:ident) => { match $fut.as_mut().poll($cx) { Poll::Ready(Ok(pending)) => { + *$this.last = Instant::now(); $this.sent.push(*pending); tracing::info!( tx_hash = ?*pending, From 1afd9c92bfceb2b046935285becb373776451261 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 9 Nov 2021 09:51:43 -0800 Subject: [PATCH 10/15] feature: debug implementation for EscalatingPending --- ethers-providers/src/pending_escalator.rs | 28 +++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 5f42b12a8..1a173b5e6 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -15,7 +15,7 @@ use wasm_timer::Delay; use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError}; /// States for the EscalatingPending future -enum PendingStates<'a, P> { +enum EscalatorStates<'a, P> { Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), Sleeping(Pin>), BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), @@ -28,6 +28,7 @@ enum PendingStates<'a, P> { /// gas prices #[must_use] #[pin_project(project = PendingProj)] +#[derive(Debug)] pub struct EscalatingPending<'a, P> where P: JsonRpcClient, @@ -38,7 +39,7 @@ where txns: Vec, last: Instant, sent: Vec, - state: PendingStates<'a, P>, + state: EscalatorStates<'a, P>, } impl<'a, P> EscalatingPending<'a, P> @@ -67,7 +68,7 @@ where // future resolves last: Instant::now(), sent: vec![], - state: PendingStates::Initial(Box::pin(provider.send_raw_transaction(first))), + state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))), } } @@ -97,7 +98,7 @@ macro_rules! check_all_receipts { macro_rules! sleep { ($cx:ident, $this:ident) => { - *$this.state = PendingStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval))); + *$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval))); $cx.waker().wake_by_ref(); return Poll::Pending }; @@ -145,7 +146,7 @@ where self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - use PendingStates::*; + use EscalatorStates::*; let this = self.project(); @@ -162,7 +163,7 @@ where let fut = this.provider.send_raw_transaction(next_to_broadcast); *this.state = BroadcastingNew(fut); cx.waker().wake_by_ref(); - return Poll::Pending; + return Poll::Pending } } check_all_receipts!(cx, this); @@ -195,7 +196,7 @@ where Poll::Pending => { // stick it pack in the list for polling again later futs.push(pollee); - return Poll::Pending; + return Poll::Pending } } } @@ -205,3 +206,16 @@ where Poll::Pending } } + +impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = match self { + Self::Initial(_) => "Initial", + Self::Sleeping(_) => "Sleeping", + Self::BroadcastingNew(_) => "BroadcastingNew", + Self::CheckingReceipts(_) => "CheckingReceipts", + Self::Completed => "Completed", + }; + f.debug_struct("EscalatorStates").field("state", &state).finish() + } +} From ed257217bfb8c348dbee44149b25c9c0d5ca3585 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Nov 2021 13:45:19 -0800 Subject: [PATCH 11/15] refactor: with_ setters and escalations argument --- ethers-providers/src/lib.rs | 7 ++++--- ethers-providers/src/pending_escalator.rs | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 34e97aa96..a8eed4ed9 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -299,15 +299,16 @@ pub trait Middleware: Sync + Send + Debug { async fn send_escalating<'a>( &'a self, tx: &TypedTransaction, - escalation: EscalationPolicy, + escalations: usize, + policy: EscalationPolicy, ) -> Result, Self::Error> { let mut original = tx.clone(); self.fill_transaction(&mut original, None).await?; let gas_price = original.gas_price().expect("filled"); let chain_id = self.get_chainid().await?.low_u64(); - let sign_futs: Vec<_> = (0..5) + let sign_futs: Vec<_> = (0..escalations) .map(|i| { - let new_price = escalation(gas_price, i); + let new_price = policy(gas_price, i); let mut r = original.clone(); r.set_gas_price(new_price); r diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 1a173b5e6..a5f05fb5d 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -72,15 +72,23 @@ where } } - pub fn broadcast_interval(mut self, duration: u64) -> Self { - self.broadcast_interval = Duration::from_secs(duration); + pub fn with_broadcast_interval(mut self, duration: impl Into) -> Self { + self.broadcast_interval = duration.into(); self } - pub fn polling_interval(mut self, duration: u64) -> Self { - self.polling_interval = Duration::from_secs(duration); + pub fn with_polling_interval(mut self, duration: impl Into) -> Self { + self.polling_interval = duration.into(); self } + + pub fn get_polling_interval(&self) -> Duration { + self.polling_interval + } + + pub fn get_broadcast_interval(&self) -> Duration { + self.broadcast_interval + } } macro_rules! check_all_receipts { From 7b91c26853d4b161203937f5adba84413e9e4ff9 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Nov 2021 14:13:06 -0800 Subject: [PATCH 12/15] refactor: use FuturesUnOrdered instead of a vec of futures --- ethers-providers/src/pending_escalator.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index a5f05fb5d..696a24aed 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -1,4 +1,5 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256}; +use futures_util::{stream::FuturesUnordered, StreamExt}; use pin_project::pin_project; use std::{ future::Future, @@ -19,7 +20,7 @@ enum EscalatorStates<'a, P> { Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), Sleeping(Pin>), BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), - CheckingReceipts(Vec>>), + CheckingReceipts(FuturesUnordered>>), Completed, } @@ -93,7 +94,7 @@ where macro_rules! check_all_receipts { ($cx:ident, $this:ident) => { - let futs: Vec<_> = $this + let futs: futures_util::stream::FuturesUnordered<_> = $this .sent .iter() .map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash)) @@ -180,30 +181,26 @@ where poll_broadcast_fut!(cx, this, fut); } CheckingReceipts(futs) => { - // if drained, sleep - if futs.is_empty() { - sleep!(cx, this); - } - // otherwise drain one and check if we have a receipt - let mut pollee = futs.pop().expect("checked"); - match pollee.as_mut().poll(cx) { + match futs.poll_next_unpin(cx) { // we have found a receipt. This means that all other // broadcast txns are now invalid, so we can drop them - Poll::Ready(Ok(Some(receipt))) => { + Poll::Ready(Some(Ok(Some(receipt)))) => { completed!(this, Ok(receipt)); } // we found no receipt, rewake and check the next future // until drained - Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), + Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(), // bubble up errors - Poll::Ready(Err(e)) => { + Poll::Ready(Some(Err(e))) => { completed!(this, Err(e)); } + Poll::Ready(None) => { + sleep!(cx, this); + } // check again later Poll::Pending => { // stick it pack in the list for polling again later - futs.push(pollee); return Poll::Pending } } From 096ca41e5fd6c837ddcf4711a079001dbbc802f8 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Nov 2021 14:18:36 -0800 Subject: [PATCH 13/15] chore: update CHANGELOG with recent PR info --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2abe6bfe4..5f244878d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ - Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482) - Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481) - `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559) +- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568) +- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570) +- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566) ### 0.5.3 From a00155191f958cce4ca92ca811989d4d43462994 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 11 Nov 2021 11:34:41 -0800 Subject: [PATCH 14/15] chore: update all comments on pending escalator --- ethers-providers/src/lib.rs | 4 +- ethers-providers/src/pending_escalator.rs | 48 +++++++++++++++-------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index a8eed4ed9..2b90f1794 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -292,10 +292,10 @@ pub trait Middleware: Sync + Send + Debug { /// Send a transaction with a simple escalation policy. /// - /// `escalation` should be a boxed function that maps `original_gas_price` + /// `policy` should be a boxed function that maps `original_gas_price` /// and `number_of_previous_escalations` -> `new_gas_price`. /// - /// e.g. `Box::new(|start, escalations| start * 1250.pow(escalations) / 1000.pow(escalations))` + /// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) / 1000.pow(escalations))` async fn send_escalating<'a>( &'a self, tx: &TypedTransaction, diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index 696a24aed..a09fa53ed 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -24,9 +24,8 @@ enum EscalatorStates<'a, P> { Completed, } -/// An EscalatingPending is a pending transaction that handles increasing its -/// own gas price over time, by broadcasting successive versions with higher -/// gas prices +/// An EscalatingPending is a pending transaction that increases its own gas +/// price over time, by broadcasting successive versions with higher gas prices. #[must_use] #[pin_project(project = PendingProj)] #[derive(Debug)] @@ -48,9 +47,10 @@ where P: JsonRpcClient, { /// Instantiate a new EscalatingPending. This should only be called by the - /// Middleware trait. Callers MUST ensure that transactions are in _reverse_ - /// broadcast order (this just makes writing the code easier, as we - /// can use `pop()` a lot) + /// Middleware trait. + /// + /// Callers MUST ensure that transactions are in _reverse_ broadcast order + /// (this just makes writing the code easier, as we can use `pop()` a lot). /// /// TODO: consider deserializing and checking invariants (gas order, etc.) pub(crate) fn new(provider: &'a Provider

, mut txns: Vec) -> Self { @@ -73,20 +73,26 @@ where } } + /// Set the broadcast interval. This controls how often the escalator + /// broadcasts a new transaction at a higher gas price pub fn with_broadcast_interval(mut self, duration: impl Into) -> Self { self.broadcast_interval = duration.into(); self } + /// Set the polling interval. This controls how often the escalator checks + /// transaction receipts for confirmation. pub fn with_polling_interval(mut self, duration: impl Into) -> Self { self.polling_interval = duration.into(); self } + /// Get the current polling interval. pub fn get_polling_interval(&self) -> Duration { self.polling_interval } + /// Get the current broadcast interval. pub fn get_broadcast_interval(&self) -> Duration { self.broadcast_interval } @@ -160,6 +166,8 @@ where let this = self.project(); match this.state { + // In the initial state we're simply waiting on the first + // transaction braodcast to complete. Initial(fut) => { poll_broadcast_fut!(cx, this, fut); } @@ -177,32 +185,38 @@ where } check_all_receipts!(cx, this); } + // This state is functionally equivalent to Initial, but we + // differentiate it for clarity BroadcastingNew(fut) => { poll_broadcast_fut!(cx, this, fut); } CheckingReceipts(futs) => { - // otherwise drain one and check if we have a receipt + // Poll the set of `get_transaction_receipt` futures to check + // if any previously-broadcast transaction was confirmed. + // Continue doing this until all are resolved match futs.poll_next_unpin(cx) { - // we have found a receipt. This means that all other - // broadcast txns are now invalid, so we can drop them + // We have found a receipt. This means that all other + // broadcast txns are now invalid, so we can drop the + // futures and complete Poll::Ready(Some(Ok(Some(receipt)))) => { completed!(this, Ok(receipt)); } - // we found no receipt, rewake and check the next future - // until drained + // A `get_transaction_receipt` request resolved, but but we + // found no receipt, rewake and check if any other requests + // are resolved Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(), - // bubble up errors + // A request errored. We complete the future with the error. Poll::Ready(Some(Err(e))) => { completed!(this, Err(e)); } + // We have run out of `get_transaction_receipt` requests. + // Sleep and then check if we should broadcast again (or + // check receipts again) Poll::Ready(None) => { sleep!(cx, this); } - // check again later - Poll::Pending => { - // stick it pack in the list for polling again later - return Poll::Pending - } + // No request has resolved yet. Try again later + Poll::Pending => return Poll::Pending, } } Completed => panic!("polled after completion"), From 67a9e15484d22cbd64c314629571ec973e588d2f Mon Sep 17 00:00:00 2001 From: James Date: Thu, 11 Nov 2021 11:37:39 -0800 Subject: [PATCH 15/15] chore: run rustfmt --- ethers-providers/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 2b90f1794..215619d11 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -295,7 +295,8 @@ pub trait Middleware: Sync + Send + Debug { /// `policy` should be a boxed function that maps `original_gas_price` /// and `number_of_previous_escalations` -> `new_gas_price`. /// - /// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) / 1000.pow(escalations))` + /// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) / + /// 1000.pow(escalations))` async fn send_escalating<'a>( &'a self, tx: &TypedTransaction,