Skip to content

Commit

Permalink
feat(providers): load previous logs before subscribing
Browse files Browse the repository at this point in the history
Load previous logs and stream it back to the user before establishing
a new stream for streaming future logs.

Closes gakonst#988
  • Loading branch information
meetmangukiya committed May 14, 2022
1 parent eb94e53 commit 87f4251
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
28 changes: 24 additions & 4 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use ethers_core::{
types::{
transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed},
Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory,
Filter, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, TraceType,
Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, TxpoolInspect,
TxpoolStatus, H256, U256, U64,
Filter, FilterBlockOption, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter,
TraceType, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent,
TxpoolInspect, TxpoolStatus, H256, U256, U64,
},
utils,
};
Expand Down Expand Up @@ -1102,9 +1102,29 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
where
P: PubsubClient,
{
let logs = match filter.block_option {
FilterBlockOption::Range { from_block, to_block: _ } => {
if from_block.is_none() {
Ok(vec![])
} else {
self.get_logs(filter).await
}
}
FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await,
};

if logs.is_err() {
return Err(logs.err().unwrap())
}

let loaded_logs = logs.unwrap();

let logs = utils::serialize(&"logs"); // TODO: Make this a static
let filter = utils::serialize(filter);
self.subscribe([logs, filter]).await
self.subscribe([logs, filter]).await.map(|mut stream| {
stream.set_loaded_elements(loaded_logs);
stream
})
}

async fn fee_history<T: Into<U256> + Send + Sync>(
Expand Down
13 changes: 12 additions & 1 deletion ethers-providers/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
pub id: U256,

loaded_elements: Vec<R>,

provider: &'a Provider<P>,

#[pin]
Expand All @@ -54,13 +56,17 @@ where
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?;
Ok(Self { id, provider, rx, ret: PhantomData })
Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: vec![] })
}

/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
}

pub fn set_loaded_elements(&mut self, loaded_elements: Vec<R>) {
self.loaded_elements = loaded_elements;
}
}

// Each subscription item is a serde_json::Value which must be decoded to the
Expand All @@ -74,6 +80,11 @@ where
type Item = R;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if self.loaded_elements.len() > 0 {
let next_element = self.get_mut().loaded_elements.remove(0);
return Poll::Ready(Some(next_element))
}

let this = self.project();
match futures_util::ready!(this.rx.poll_next(ctx)) {
Some(item) => match serde_json::from_str(item.get()) {
Expand Down

0 comments on commit 87f4251

Please sign in to comment.