Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
wip reconnect socket after bad protocol message
Browse files Browse the repository at this point in the history
  • Loading branch information
EverlastingBugstopper committed May 12, 2020
1 parent a57cf29 commit 81d3106
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ tempfile = "3.1.0"
term_size = "0.3"
text_io = "0.1.7"
tokio = { version = "0.2", default-features = false, features = ["io-std", "time", "macros", "process", "signal", "sync"] }
tokio-tls = "0.3.1"
tokio-tungstenite = { version = "0.10.1", features = ["tls"] }
toml = "0.5.5"
twox-hash = "1.5.0"
Expand Down
52 changes: 35 additions & 17 deletions src/commands/dev/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ use std::time::Duration;
use chrome_devtools as protocol;

use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::stream::{SplitStream, StreamExt};

use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::delay_for;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio_tls::TlsStream;
use tokio_tungstenite::stream::Stream;
use tokio_tungstenite::tungstenite::Error as TungsteniteError;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream};

use url::Url;

Expand Down Expand Up @@ -40,24 +44,38 @@ pub async fn listen(session_id: String) -> Result<(), failure::Error> {
tokio::spawn(keep_alive(keep_alive_tx));
let keep_alive_to_ws = keep_alive_rx.map(Ok).forward(write);

// parse every incoming message and print them
let print_ws_messages = {
read.for_each(|message| async {
let message = message.unwrap().into_text().unwrap();
log::info!("{}", message);
let message: Result<protocol::Runtime, failure::Error> = serde_json::from_str(&message)
.map_err(|e| failure::format_err!("this event could not be parsed:\n{}", e));
if let Ok(protocol::Runtime::Event(event)) = message {
println!("{}", event);
}
})
};
// parse all incoming messages and print them to stdout
let printer = tokio::spawn(print_ws_messages(&mut read));

// run the heartbeat and message printer in parallel
tokio::select! {
_ = keep_alive_to_ws => { Ok(()) }
_ = print_ws_messages => { Ok(()) }
let res = tokio::try_join!(async {printer.await}, keep_alive_to_ws);

match res {
Ok(_) => Ok(()),
Err(_) => listen(session_id),
}
}

async fn print_ws_messages(
read: &mut SplitStream<WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>>>,
) -> Result<(), TungsteniteError> {
while let Some(message) = read.next().await {
match message {
Ok(message) => {
let message_text = message.into_text().unwrap();
log::info!("{}", message_text);
let parsed_message: Result<protocol::Runtime, failure::Error> =
serde_json::from_str(&message_text).map_err(|e| {
failure::format_err!("this event could not be parsed:\n{}", e)
});
if let Ok(protocol::Runtime::Event(event)) = parsed_message {
println!("{}", event);
}
}
Err(error) => return Err(error),
}
}
Ok(())
}

async fn keep_alive(tx: mpsc::UnboundedSender<Message>) -> ! {
Expand Down

0 comments on commit 81d3106

Please sign in to comment.