Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
wip reconnect socket after bad protocol message
Browse files Browse the repository at this point in the history
  • Loading branch information
EverlastingBugstopper committed May 13, 2020
1 parent a57cf29 commit 37686a4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 61 deletions.
48 changes: 30 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ tempfile = "3.1.0"
term_size = "0.3"
text_io = "0.1.7"
tokio = { version = "0.2", default-features = false, features = ["io-std", "time", "macros", "process", "signal", "sync"] }
tokio-tls = "0.3.1"
tokio-tungstenite = { version = "0.10.1", features = ["tls"] }
toml = "0.5.5"
twox-hash = "1.5.0"
url = "2.1.0"
uuid = { version = "0.8", features = ["v4"] }
which = "3.1.1"
ws = "0.9.0"
futures = "0.3.5"

[dev-dependencies]
assert_cmd = "0.11.1"
Expand Down
119 changes: 76 additions & 43 deletions src/commands/dev/socket.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,96 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;

use chrome_devtools as protocol;

use futures::future::{BoxFuture, FutureExt};
use futures_util::future::TryFutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::stream::{SplitStream, StreamExt};

use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::delay_for;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio_tls::TlsStream;
use tokio_tungstenite::stream::Stream;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream};

use url::Url;

const KEEP_ALIVE_INTERVAL: u64 = 10;

/// connect to a Workers runtime WebSocket emitting the Chrome Devtools Protocol
/// parse all console messages, and print them to stdout
pub async fn listen(session_id: String) -> Result<(), failure::Error> {
let socket_url = format!("wss://rawhttp.cloudflareworkers.com/inspect/{}", session_id);
let socket_url = Url::parse(&socket_url)?;

let (ws_stream, _) = connect_async(socket_url)
.await
.expect("Failed to connect to devtools instance");

let (mut write, read) = ws_stream.split();

// console.log messages are in the Runtime domain
// we must signal that we want to receive messages from the Runtime domain
// before they will be sent
let enable_runtime = protocol::runtime::SendMethod::Enable(1.into());
let enable_runtime = serde_json::to_string(&enable_runtime)?;
let enable_runtime = Message::Text(enable_runtime);
write.send(enable_runtime).await?;

// if left unattended, the preview service will kill the socket
// that emits console messages
// send a keep alive message every so often in the background
let (keep_alive_tx, keep_alive_rx) = mpsc::unbounded_channel();
tokio::spawn(keep_alive(keep_alive_tx));
let keep_alive_to_ws = keep_alive_rx.map(Ok).forward(write);

// parse every incoming message and print them
let print_ws_messages = {
read.for_each(|message| async {
let message = message.unwrap().into_text().unwrap();
log::info!("{}", message);
let message: Result<protocol::Runtime, failure::Error> = serde_json::from_str(&message)
.map_err(|e| failure::format_err!("this event could not be parsed:\n{}", e));
if let Ok(protocol::Runtime::Event(event)) = message {
println!("{}", event);
}
})
};

// run the heartbeat and message printer in parallel
tokio::select! {
_ = keep_alive_to_ws => { Ok(()) }
_ = print_ws_messages => { Ok(()) }
// the reason this needs to return a `BoxFuture` is so that we can call it recursively
// if something goes wrong with the websocket connection
pub fn listen(session_id: String) -> BoxFuture<'static, Result<(), failure::Error>> {
async move {
let socket_url = format!("wss://rawhttp.cloudflareworkers.com/inspect/{}", session_id);
let socket_url = Url::parse(&socket_url)?;

let (ws_stream, _) = connect_async(socket_url)
.await
.expect("Failed to connect to devtools instance");

let (mut write, read) = ws_stream.split();

let passable_read = Arc::new(Mutex::new(read));

// console.log messages are in the Runtime domain
// we must signal that we want to receive messages from the Runtime domain
// before they will be sent
let enable_runtime = protocol::runtime::SendMethod::Enable(1.into());
let enable_runtime = serde_json::to_string(&enable_runtime)?;
let enable_runtime = Message::Text(enable_runtime);
write.send(enable_runtime).await?;

// if left unattended, the preview service will kill the socket
// that emits console messages
// send a keep alive message every so often in the background
let (keep_alive_tx, keep_alive_rx) = mpsc::unbounded_channel();
tokio::spawn(keep_alive(keep_alive_tx));
let keep_alive_to_ws = keep_alive_rx
.map(Ok)
.forward(write)
.map_err(|e| failure::format_err!("{:?}", e));

// parse all incoming messages and print them to stdout
let printer = tokio::spawn(print_ws_messages(Arc::clone(&passable_read)))
.map_err(|e| failure::format_err!("{:?}", e));

// run the heartbeat and message printer in parallel
let res = tokio::try_join!(printer, keep_alive_to_ws);

match res {
Ok(_) => Ok(()),
Err(_) => listen(session_id).await,
}
}
.boxed()
}

async fn print_ws_messages(
read: Arc<Mutex<SplitStream<WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>>>>>,
) -> Result<(), failure::Error> {
let mut read = read.lock().unwrap();
while let Some(message) = read.next().await {
match message {
Ok(message) => {
let message_text = message.into_text().unwrap();
log::info!("{}", message_text);
let parsed_message: Result<protocol::Runtime, failure::Error> =
serde_json::from_str(&message_text).map_err(|e| {
failure::format_err!("this event could not be parsed:\n{}", e)
});
if let Ok(protocol::Runtime::Event(event)) = parsed_message {
println!("{}", event);
}
}
Err(error) => return Err(error.into()),
}
}
Ok(())
}

async fn keep_alive(tx: mpsc::UnboundedSender<Message>) -> ! {
Expand Down

0 comments on commit 37686a4

Please sign in to comment.