Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
reconnect socket if an error occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
EverlastingBugstopper committed May 13, 2020
1 parent a57cf29 commit 05ed1a7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 60 deletions.
48 changes: 30 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ tempfile = "3.1.0"
term_size = "0.3"
text_io = "0.1.7"
tokio = { version = "0.2", default-features = false, features = ["io-std", "time", "macros", "process", "signal", "sync"] }
tokio-tls = "0.3.1"
tokio-tungstenite = { version = "0.10.1", features = ["tls"] }
toml = "0.5.5"
twox-hash = "1.5.0"
url = "2.1.0"
uuid = { version = "0.8", features = ["v4"] }
which = "3.1.1"
ws = "0.9.0"
futures = "0.3.5"

[dev-dependencies]
assert_cmd = "0.11.1"
Expand Down
116 changes: 74 additions & 42 deletions src/commands/dev/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,94 @@ use std::time::Duration;

use chrome_devtools as protocol;

use futures::future::{BoxFuture, FutureExt};
use futures_util::future::TryFutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::stream::{SplitStream, StreamExt};

use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::delay_for;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio_tls::TlsStream;
use tokio_tungstenite::stream::Stream;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream};

use url::Url;

const KEEP_ALIVE_INTERVAL: u64 = 10;

/// connect to a Workers runtime WebSocket emitting the Chrome Devtools Protocol
/// parse all console messages, and print them to stdout
pub async fn listen(session_id: String) -> Result<(), failure::Error> {
let socket_url = format!("wss://rawhttp.cloudflareworkers.com/inspect/{}", session_id);
let socket_url = Url::parse(&socket_url)?;

let (ws_stream, _) = connect_async(socket_url)
.await
.expect("Failed to connect to devtools instance");

let (mut write, read) = ws_stream.split();

// console.log messages are in the Runtime domain
// we must signal that we want to receive messages from the Runtime domain
// before they will be sent
let enable_runtime = protocol::runtime::SendMethod::Enable(1.into());
let enable_runtime = serde_json::to_string(&enable_runtime)?;
let enable_runtime = Message::Text(enable_runtime);
write.send(enable_runtime).await?;

// if left unattended, the preview service will kill the socket
// that emits console messages
// send a keep alive message every so often in the background
let (keep_alive_tx, keep_alive_rx) = mpsc::unbounded_channel();
tokio::spawn(keep_alive(keep_alive_tx));
let keep_alive_to_ws = keep_alive_rx.map(Ok).forward(write);

// parse every incoming message and print them
let print_ws_messages = {
read.for_each(|message| async {
let message = message.unwrap().into_text().unwrap();
log::info!("{}", message);
let message: Result<protocol::Runtime, failure::Error> = serde_json::from_str(&message)
.map_err(|e| failure::format_err!("this event could not be parsed:\n{}", e));
if let Ok(protocol::Runtime::Event(event)) = message {
println!("{}", event);

// the reason this needs to return a `BoxFuture` is so that we can call it recursively
// if something goes wrong with the websocket connection
pub fn listen(session_id: String) -> BoxFuture<'static, Result<(), failure::Error>> {
async move {
let socket_url = format!("wss://rawhttp.cloudflareworkers.com/inspect/{}", session_id);
let socket_url = Url::parse(&socket_url)?;

let (ws_stream, _) = connect_async(socket_url)
.await
.expect("Failed to connect to devtools instance");

let (mut write, read) = ws_stream.split();

// console.log messages are in the Runtime domain
// we must signal that we want to receive messages from the Runtime domain
// before they will be sent
let enable_runtime = protocol::runtime::SendMethod::Enable(1.into());
let enable_runtime = serde_json::to_string(&enable_runtime)?;
let enable_runtime = Message::Text(enable_runtime);
write.send(enable_runtime).await?;

// if left unattended, the preview service will kill the socket
// that emits console messages
// send a keep alive message every so often in the background
let (keep_alive_tx, keep_alive_rx) = mpsc::unbounded_channel();
tokio::spawn(keep_alive(keep_alive_tx));
let keep_alive_to_ws = keep_alive_rx
.map(Ok)
.forward(write)
.map_err(|e| failure::format_err!("{:?}", e));

// parse all incoming messages and print them to stdout
let printer =
tokio::spawn(print_ws_messages(read)).map_err(|e| failure::format_err!("{:?}", e));

// run the heartbeat and message printer in parallel
let res = tokio::try_join!(printer, keep_alive_to_ws);

match res {
Ok(_) => Ok(()),
Err(_) => {
// start a new websocket connection since the current one is closed
listen(session_id).await
}
})
};
}
}
.boxed()
}

// run the heartbeat and message printer in parallel
tokio::select! {
_ = keep_alive_to_ws => { Ok(()) }
_ = print_ws_messages => { Ok(()) }
async fn print_ws_messages(
mut read: SplitStream<WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>>>,
) -> Result<(), failure::Error> {
while let Some(message) = read.next().await {
match message {
Ok(message) => {
let message_text = message.into_text().unwrap();
log::info!("{}", message_text);
let parsed_message: Result<protocol::Runtime, failure::Error> =
serde_json::from_str(&message_text).map_err(|e| {
failure::format_err!("this event could not be parsed:\n{}", e)
});
if let Ok(protocol::Runtime::Event(event)) = parsed_message {
println!("{}", event);
}
}
Err(error) => return Err(error.into()),
}
}
Ok(())
}

async fn keep_alive(tx: mpsc::UnboundedSender<Message>) -> ! {
Expand Down

0 comments on commit 05ed1a7

Please sign in to comment.