From a832b48bcae5fd9d027d8d84bb13601e0f44b4c4 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 9 Nov 2021 09:07:55 -0800 Subject: [PATCH] refactor: use Delay in polling interval to ensure re-waking --- ethers-providers/src/lib.rs | 5 +-- ethers-providers/src/pending_escalator.rs | 43 ++++++++++++----------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index f1b01bfa7..34e97aa96 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -299,7 +299,6 @@ pub trait Middleware: Sync + Send + Debug { async fn send_escalating<'a>( &'a self, tx: &TypedTransaction, - from: &Address, escalation: EscalationPolicy, ) -> Result, Self::Error> { let mut original = tx.clone(); @@ -314,7 +313,9 @@ pub trait Middleware: Sync + Send + Debug { r }) .map(|req| async move { - self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) + self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default()) + .await + .map(|sig| req.rlp_signed(chain_id, &sig)) }) .collect(); diff --git a/ethers-providers/src/pending_escalator.rs b/ethers-providers/src/pending_escalator.rs index a9fcf7bbd..2e5f5657c 100644 --- a/ethers-providers/src/pending_escalator.rs +++ b/ethers-providers/src/pending_escalator.rs @@ -2,16 +2,22 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256}; use pin_project::pin_project; use std::{ future::Future, + pin::Pin, task::Poll, time::{Duration, Instant}, }; +#[cfg(not(target_arch = "wasm32"))] +use futures_timer::Delay; +#[cfg(target_arch = "wasm32")] +use wasm_timer::Delay; + use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError}; /// States for the EscalatingPending future enum PendingStates<'a, P> { Initial(PinBoxFut<'a, PendingTransaction<'a, P>>), - Sleeping(Instant), + Sleeping(Pin>), BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>), CheckingReceipts(Vec>>), Completed, @@ -89,7 +95,7 @@ macro_rules! check_all_receipts { macro_rules! sleep { ($cx:ident, $this:ident) => { - *$this.state = PendingStates::Sleeping(std::time::Instant::now()); + *$this.state = PendingStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval))); $cx.waker().wake_by_ref(); return Poll::Pending }; @@ -135,26 +141,21 @@ where Initial(fut) => { broadcast_checks!(cx, this, fut); } - Sleeping(instant) => { - if instant.elapsed() > *this.polling_interval { - // if timer has elapsed (or this is the first tx) - if this.last.is_none() || - (*this.last).unwrap().elapsed() > *this.broadcast_interval - { - // then if we have a TX to broadcast, start - // broadcasting it - if let Some(next_to_broadcast) = this.txns.pop() { - let fut = this.provider.send_raw_transaction(next_to_broadcast); - *this.state = BroadcastingNew(fut); - cx.waker().wake_by_ref(); - return Poll::Pending - } + Sleeping(delay) => { + let _ready = futures_util::ready!(delay.as_mut().poll(cx)); + // if timer has elapsed (or this is the first tx) + if this.last.is_none() || (*this.last).unwrap().elapsed() > *this.broadcast_interval + { + // then if we have a TX to broadcast, start + // broadcasting it + if let Some(next_to_broadcast) = this.txns.pop() { + let fut = this.provider.send_raw_transaction(next_to_broadcast); + *this.state = BroadcastingNew(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; } - - check_all_receipts!(cx, this); } - - return Poll::Pending + check_all_receipts!(cx, this); } BroadcastingNew(fut) => { broadcast_checks!(cx, this, fut); @@ -181,7 +182,7 @@ where Poll::Pending => { // stick it pack in the list for polling again later futs.push(pollee); - return Poll::Pending + return Poll::Pending; } } }