Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

[WIP] Update tokio ecosystem #1880

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
458 changes: 269 additions & 189 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ binary-install = "0.0.3-alpha.1"
chrome-devtools-rs = { version = "=0.0.0-alpha.1", features = ["color"] }
chrono = "0.4.19"
clap = "2.33.3"
cloudflare = "0.6.6"
cloudflare = "0.8.1"
config = "0.10.1"
console = "0.13.0"
dirs = "3.0.1"
Expand All @@ -31,8 +31,8 @@ fs2 = "0.4.3"
futures-util = "0.3"
globset = "0.4.6"
http = "0.2.1"
hyper = "0.13.9"
hyper-rustls = "0.21.0"
hyper = { version = "0.14.7", features = ["http2", "server", "runtime"] }
hyper-rustls = "0.22.1"
ignore = "0.4.17"
indicatif = "0.15.0"
lazy_static = "1.4.0"
Expand All @@ -47,18 +47,19 @@ prettytable-rs = "0.8.0"
rand = "0.7.3"
regex = "1.4.1"
reqwest = { version = "0.10.9", features = ["blocking", "json"] }
rustls = "0.18.1"
rustls = "0.19.1"
semver = "0.11.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.60"
serde_with = "1.5.1"
tempfile = "3.1.0"
term_size = "0.3"
text_io = "0.1.8"
tokio = { version = "0.2", default-features = false, features = ["io-std", "time", "macros", "process", "signal", "sync"] }
tokio-native-tls = "0.1.0"
tokio-rustls = "0.14.1"
tokio-tungstenite = { version = "0.11.0", features = ["tls"] }
tokio = { version = "1.5.0", default-features = false, features = ["io-std", "time", "macros", "process", "signal", "sync"] }
tokio-native-tls = "0.3.0"
tokio-rustls = "0.22.0"
tokio-stream = "0.1.5"
tokio-tungstenite = "0.14.0"
toml = "0.5.8"
toml_edit = "0.2.0"
twox-hash = "1.6.0"
Expand All @@ -74,12 +75,12 @@ predicates = "1.0.5"

[features]
# OpenSSL is vendored by default, can use system OpenSSL through feature flag.
default = ['openssl/vendored']
default = ['openssl/vendored', 'tokio-tungstenite/native-tls-vendored']

# Treat compiler warnings as a build error.
# This only runs in CI by default
strict = ['openssl/vendored']
sys-openssl = ['openssl']
sys-openssl = ['openssl', 'tokio-tungstenite/native-tls']
# Keeping feature for users already using this feature flag
vendored-openssl = ['openssl/vendored']

Expand Down
2 changes: 1 addition & 1 deletion src/commands/dev/edge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn dev(
});
}

let mut runtime = TokioRuntime::new()?;
let runtime = TokioRuntime::new()?;
runtime.block_on(async {
let devtools_listener = tokio::spawn(socket::listen(session.websocket_url));
let server = match local_protocol {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/dev/edge/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn http(
upstream_protocol: Protocol,
) -> Result<(), failure::Error> {
// set up https client to connect to the preview service
let https = HttpsConnector::new();
let https = HttpsConnector::with_native_roots();
let client = HyperClient::builder().build::<_, Body>(https);

let listening_address = server_config.listening_address;
Expand Down
42 changes: 23 additions & 19 deletions src/commands/dev/edge/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::terminal::message::{Message, StdOut};
use std::sync::{Arc, Mutex};

use chrono::prelude::*;
use futures_util::stream::StreamExt;
use futures_util::{stream::StreamExt, FutureExt};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Server};
Expand All @@ -21,7 +21,7 @@ pub async fn https(
tls::generate_cert()?;

// set up https client to connect to the preview service
let https = HttpsConnector::new();
let https = HttpsConnector::with_native_roots();
let client = HyperClient::builder().build::<_, Body>(https);

let listening_address = server_config.listening_address;
Expand Down Expand Up @@ -75,28 +75,32 @@ pub async fn https(
}
});

let mut tcp = TcpListener::bind(&listening_address).await?;
let tcp = TcpListener::bind(&listening_address).await?;
let tls_acceptor = &tls::get_tls_acceptor()?;
let incoming_tls_stream = tcp
.incoming()
.filter_map(move |s| async move {
let client = match s {
Ok(x) => x,
Err(e) => {
eprintln!("Failed to accept client {}", e);
return None;
}
};
match tls_acceptor.accept(client).await {
Ok(x) => Some(Ok(x)),
let incoming_tls_stream = async {
let tcp_stream = match tcp.accept().await {
Ok((tcp_stream, _addr)) => Ok(tcp_stream),
Err(e) => {
eprintln!("Failed to accept client {}", e);
Err(e)
}
};

match tcp_stream {
Ok(stream) => match tls_acceptor.accept(stream).await {
Ok(tls_stream) => Ok(tls_stream),
Err(e) => {
eprintln!("Client connection error {}", e);
StdOut::info("Make sure to use https and `--insecure` with curl");
None
Err(e)
}
}
})
.boxed();
},
Err(e) => Err(e),
}
}
.into_stream()
.boxed();

let server = Server::builder(tls::HyperAcceptor {
acceptor: incoming_tls_stream,
})
Expand Down
2 changes: 1 addition & 1 deletion src/commands/dev/gcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn dev(
let socket_url = get_socket_url(&session_id)?;

// in order to spawn futures we must create a tokio runtime
let mut runtime = TokioRuntime::new()?;
let runtime = TokioRuntime::new()?;

// and we must block the main thread on the completion of
// said futures
Expand Down
2 changes: 1 addition & 1 deletion src/commands/dev/gcs/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn http(
preview_id: Arc<Mutex<String>>,
) -> Result<(), failure::Error> {
// set up https client to connect to the preview service
let https = HttpsConnector::new();
let https = HttpsConnector::with_native_roots();
let client = HyperClient::builder().build::<_, Body>(https);

let listening_address = server_config.listening_address;
Expand Down
41 changes: 22 additions & 19 deletions src/commands/dev/gcs/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::terminal::message::{Message, StdOut};
use std::sync::{Arc, Mutex};

use chrono::prelude::*;
use futures_util::stream::StreamExt;
use futures_util::{FutureExt, StreamExt};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Response, Server};
use hyper_rustls::HttpsConnector;
Expand All @@ -23,7 +23,7 @@ pub async fn https(
tls::generate_cert()?;

// set up https client to connect to the preview service
let https = HttpsConnector::new();
let https = HttpsConnector::with_native_roots();
let client = HyperClient::builder().build::<_, Body>(https);

let listening_address = server_config.listening_address;
Expand Down Expand Up @@ -98,28 +98,31 @@ pub async fn https(
});

// Create a TCP listener via tokio.
let mut tcp = TcpListener::bind(&listening_address).await?;
let tcp = TcpListener::bind(&listening_address).await?;
let tls_acceptor = &tls::get_tls_acceptor()?;
let incoming_tls_stream = tcp
.incoming()
.filter_map(move |s| async move {
let client = match s {
Ok(x) => x,
Err(e) => {
eprintln!("Failed to accept client {}", e);
return None;
}
};
match tls_acceptor.accept(client).await {
Ok(x) => Some(Ok(x)),
let incoming_tls_stream = async {
let tcp_stream = match tcp.accept().await {
Ok((tcp_stream, _addr)) => Ok(tcp_stream),
Err(e) => {
eprintln!("Failed to accept client {}", e);
Err(e)
}
};

match tcp_stream {
Ok(stream) => match tls_acceptor.accept(stream).await {
Ok(tls_stream) => Ok(tls_stream),
Err(e) => {
eprintln!("Client connection error {}", e);
StdOut::info("Make sure to use https and `--insecure` with curl");
None
Err(e)
}
}
})
.boxed();
},
Err(e) => Err(e),
}
}
.into_stream()
.boxed();

let server = Server::builder(tls::HyperAcceptor {
acceptor: incoming_tls_stream,
Expand Down
27 changes: 14 additions & 13 deletions src/commands/dev/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use chrome_devtools as protocol;
use futures_util::future::TryFutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::{SplitStream, StreamExt};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::terminal::message::{Message, StdErr, StdOut};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::delay_for;
use tokio_native_tls::TlsStream;
use tokio_tungstenite::stream::Stream;
use tokio_tungstenite::{connect_async, tungstenite, WebSocketStream};
use tokio::time::sleep;

use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};

use url::Url;

Expand Down Expand Up @@ -46,7 +46,10 @@ pub async fn listen(socket_url: Url) -> Result<(), failure::Error> {

// when the keep alive channel receives a message from the
// heartbeat future, write it to the websocket
let keep_alive_to_ws = keep_alive_rx.map(Ok).forward(write).map_err(Into::into);
let keep_alive_to_ws = UnboundedReceiverStream::new(keep_alive_rx)
.map(Ok)
.forward(write)
.map_err(Into::into);

// parse all incoming messages and print them to stdout
let printer = print_ws_messages(read);
Expand All @@ -61,9 +64,7 @@ pub async fn listen(socket_url: Url) -> Result<(), failure::Error> {

// Endlessly retry connecting to the chrome devtools instance with exponential backoff.
// The backoff maxes out at 60 seconds.
async fn connect_retry(
socket_url: &Url,
) -> WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>> {
async fn connect_retry(socket_url: &Url) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
let mut wait_seconds = 2;
let maximum_wait_seconds = 60;
let mut failed = false;
Expand All @@ -83,7 +84,7 @@ async fn connect_retry(
"Will retry connection in {} seconds",
wait_seconds
));
delay_for(Duration::from_secs(wait_seconds)).await;
sleep(Duration::from_secs(wait_seconds)).await;
wait_seconds = wait_seconds.pow(2);
if wait_seconds > maximum_wait_seconds {
// max out at 60 seconds
Expand All @@ -96,13 +97,13 @@ async fn connect_retry(
}

async fn print_ws_messages(
mut read: SplitStream<WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>>>,
mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<(), failure::Error> {
while let Some(message) = read.next().await {
match message {
Ok(message) => {
let message_text = message.into_text().unwrap();
log::info!("{}", message_text);
log::info!("{}", &message_text);

let parsed_message: Result<protocol::Runtime, failure::Error> =
serde_json::from_str(&message_text).map_err(|e| {
Expand Down Expand Up @@ -134,7 +135,7 @@ async fn keep_alive(
tx: mpsc::UnboundedSender<tungstenite::protocol::Message>,
) -> Result<(), failure::Error> {
let duration = Duration::from_millis(1000 * KEEP_ALIVE_INTERVAL);
let mut delay = delay_for(duration);
let mut delay = sleep(duration);

// this is set to 2 because we have already sent an id of 1 to enable the runtime
// eventually this logic should be moved to the chrome-devtools-rs library
Expand All @@ -148,6 +149,6 @@ async fn keep_alive(
let keep_alive_message = tungstenite::protocol::Message::Text(keep_alive_message);
tx.send(keep_alive_message).unwrap();
id += 1;
delay = delay_for(duration);
delay = sleep(duration);
}
}
3 changes: 3 additions & 0 deletions src/http/cf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn cf_v4_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error>
config,
Environment::Production,
)
.map_err(|e| failure::format_err!("{}", e))
}

pub fn featured_cf_v4_client(
Expand All @@ -37,6 +38,7 @@ pub fn featured_cf_v4_client(
config,
Environment::Production,
)
.map_err(|e| failure::format_err!("{}", e))
}

pub fn cf_v4_api_client_async(
Expand All @@ -48,6 +50,7 @@ pub fn cf_v4_api_client_async(
config,
Environment::Production,
)
.map_err(|e| failure::format_err!("{}", e))
}

// Format errors from the cloudflare-rs cli for printing.
Expand Down
1 change: 1 addition & 0 deletions src/kv/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error> {
config,
Environment::Production,
)
.map_err(|e| failure::format_err!("{}", e))
caass marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn put(
Expand Down
2 changes: 1 addition & 1 deletion src/tail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Tail {
is_cloudflared_installed()?;
print_startup_message(&target.name, tunnel_port, metrics_port);

let mut runtime = TokioRuntime::new()?;
let runtime = TokioRuntime::new()?;

runtime.block_on(async {
// Create three [one-shot](https://docs.rs/tokio/0.2.16/tokio/sync#oneshot-channel)
Expand Down
Loading