Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
style: better notification handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Aug 22, 2021
1 parent 1ad9368 commit ef8660e
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions ethers-providers/src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures_util::{
stream::{Fuse, Stream, StreamExt},
};
use serde::{de::DeserializeOwned, Serialize};
use std::collections::btree_map::Entry;
use std::{
collections::BTreeMap,
fmt::{self, Debug},
Expand Down Expand Up @@ -198,13 +199,25 @@ where
}
}

/// Returns whether the all work has been completed.
///
/// If this method returns `true`, then all the `instructions` channel has been closed and all
/// pending requests and subscriptions have been completed.
fn is_done(&self) -> bool {
self.instructions.is_done() && self.pending.is_empty() && self.subscriptions.is_empty()
}

/// Spawns the event loop
fn spawn(mut self)
where
S: 'static,
{
let f = async move {
loop {
if self.is_done() {
tracing::info!("work complete");
break;
}
match self.tick().await {
Err(ClientError::UnexpectedClose) => {
tracing::error!("{}", ClientError::UnexpectedClose);
Expand All @@ -213,9 +226,6 @@ where
Err(e) => {
panic!("WS Server panic: {}", e);
}
Ok(None) => {
tracing::info!("work complete");
}
_ => {}
}
}
Expand Down Expand Up @@ -291,15 +301,14 @@ where
}
Ok(Incoming::Notification(notification)) => {
let id = notification.params.subscription;
if let Some(stream) = self.subscriptions.remove(&id) {
if let Err(err) = stream.unbounded_send(notification.params.result) {
if !err.is_disconnected() {
// channel is still open
self.subscriptions.insert(id, stream);
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
if let Err(err) = stream.get().unbounded_send(notification.params.result) {
if err.is_disconnected() {
// subscription channel was closed on the receiver end
stream.remove();
}
return Err(to_client_error(err));
}
self.subscriptions.insert(id, stream);
}
}
}
Expand All @@ -319,17 +328,11 @@ where

/// Processes 1 instruction or 1 incoming websocket message
#[allow(clippy::single_match)]
async fn tick(&mut self) -> Result<Option<()>, ClientError> {
async fn tick(&mut self) -> Result<(), ClientError> {
futures_util::select! {
// Handle requests
instruction = self.instructions.next() => match instruction {
Some(instruction) => self.service(instruction).await?,
None => {
// can't receive any more instructions, close if work completed
if self.pending.is_empty() && self.subscriptions.is_empty() {
return Ok(None);
}
},
instruction = self.instructions.select_next_some() => {
self.service(instruction).await?;
},
// Handle ws messages
resp = self.ws.next() => match resp {
Expand All @@ -342,7 +345,7 @@ where
}
};

Ok(Some(()))
Ok(())
}
}

Expand Down

0 comments on commit ef8660e

Please sign in to comment.