Skip to content

Commit

Permalink
refactor: use Delay in polling interval to ensure re-waking
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Nov 9, 2021
1 parent 44fb4ce commit a832b48
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
5 changes: 3 additions & 2 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ pub trait Middleware: Sync + Send + Debug {
async fn send_escalating<'a>(
&'a self,
tx: &TypedTransaction,
from: &Address,
escalation: EscalationPolicy,
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
let mut original = tx.clone();
Expand All @@ -314,7 +313,9 @@ pub trait Middleware: Sync + Send + Debug {
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig))
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
.await
.map(|sig| req.rlp_signed(chain_id, &sig))
})
.collect();

Expand Down
43 changes: 22 additions & 21 deletions ethers-providers/src/pending_escalator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ use ethers_core::types::{Bytes, TransactionReceipt, H256};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_timer::Delay;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Delay;

use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};

/// States for the EscalatingPending future
enum PendingStates<'a, P> {
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
Sleeping(Instant),
Sleeping(Pin<Box<Delay>>),
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
CheckingReceipts(Vec<PinBoxFut<'a, Option<TransactionReceipt>>>),
Completed,
Expand Down Expand Up @@ -89,7 +95,7 @@ macro_rules! check_all_receipts {

macro_rules! sleep {
($cx:ident, $this:ident) => {
*$this.state = PendingStates::Sleeping(std::time::Instant::now());
*$this.state = PendingStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
$cx.waker().wake_by_ref();
return Poll::Pending
};
Expand Down Expand Up @@ -135,26 +141,21 @@ where
Initial(fut) => {
broadcast_checks!(cx, this, fut);
}
Sleeping(instant) => {
if instant.elapsed() > *this.polling_interval {
// if timer has elapsed (or this is the first tx)
if this.last.is_none() ||
(*this.last).unwrap().elapsed() > *this.broadcast_interval
{
// then if we have a TX to broadcast, start
// broadcasting it
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending
}
Sleeping(delay) => {
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
// if timer has elapsed (or this is the first tx)
if this.last.is_none() || (*this.last).unwrap().elapsed() > *this.broadcast_interval
{
// then if we have a TX to broadcast, start
// broadcasting it
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending;
}

check_all_receipts!(cx, this);
}

return Poll::Pending
check_all_receipts!(cx, this);
}
BroadcastingNew(fut) => {
broadcast_checks!(cx, this, fut);
Expand All @@ -181,7 +182,7 @@ where
Poll::Pending => {
// stick it pack in the list for polling again later
futs.push(pollee);
return Poll::Pending
return Poll::Pending;
}
}
}
Expand Down

0 comments on commit a832b48

Please sign in to comment.