diff --git a/Cargo.lock b/Cargo.lock index 5e3a32ae4..1d4728a43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -761,6 +761,15 @@ dependencies = [ "termcolor", ] +[[package]] +name = "erased-serde" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3de9ad4541d99dc22b59134e7ff8dc3d6c988c89ecd7324bf10a8362b07a2afa" +dependencies = [ + "serde 1.0.126", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -925,6 +934,21 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.15" @@ -932,6 +956,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -940,6 +965,17 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" +[[package]] +name = "futures-executor" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.15" @@ -978,6 +1014,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" dependencies = [ "autocfg", + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -3274,10 +3311,12 @@ dependencies = [ "console 0.14.1", "dirs 3.0.2", "env_logger", + "erased-serde", "eventual", "flate2", "fs2", "fs_extra", + "futures", "futures-util", "globset", "http", diff --git a/Cargo.toml b/Cargo.toml index 475599515..2e8cf820c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ env_logger = "0.8.4" eventual = "0.1.7" flate2 = "1.0.18" fs2 = "0.4.3" +futures = "0.3.15" futures-util = "0.3" globset = "0.4.6" http = "0.2.1" @@ -54,6 +55,7 @@ semver = "1.0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.60" serde_with = "1.5.1" +erased-serde = "0.3" structopt = "0.3.21" sys-info = "0.9" tempfile = "3.1.0" diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 49f768498..f624cb822 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -32,8 +32,10 @@ pub mod exec { use std::net::IpAddr; use std::path::PathBuf; +use std::str::FromStr; use crate::commands::dev::Protocol; +use crate::commands::tail::websocket::TailFormat; use crate::preview::HttpMethod; use crate::settings::toml::migrations::{ DurableObjectsMigration, Migration, MigrationConfig, Migrations, RenameClass, TransferClass, @@ -215,23 +217,59 @@ pub enum Command { #[structopt(name = "whoami")] Whoami, - /// Aggregate logs from production worker + /// View a stream of logs from a published worker #[structopt(name = "tail")] Tail { - /// Specify an output format + /// Name of the worker to tail + #[structopt(index = 1)] + name: Option, + + /// Output format for log messages #[structopt(long, short = "f", default_value = "json", possible_values = &["json", "pretty"])] - format: String, + format: TailFormat, + + /// Stops the tail after receiving the first log (useful for testing) + #[structopt(long)] + once: bool, + + /// Adds a sampling rate (0.01 for 1%) + #[structopt(long = "sampling-rate", default_value = "1")] + sampling_rate: f64, + + /// Filter by invocation status + #[structopt(long, possible_values = &["ok", "error", "canceled"])] + status: Vec, + + /// Filter by HTTP method + #[structopt(long)] + method: Vec, - /// Port to accept tail log requests - #[structopt(long = "port", short = "p")] + /// Filter by HTTP header + #[structopt(long)] + header: Vec, + + /// Filter by IP address ("self" to filter your own IP address) + #[structopt(long = "ip-address", parse(try_from_str = parse_ip_address))] + ip_address: Vec, + + /// Filter by a text match in console.log messages + #[structopt(long)] + search: Option, + + /// Set the URL to forward log messages + #[structopt(hidden = true)] + url: Option, + + /// Deprecated, no longer used. + #[structopt(hidden = true, long = "port", short = "p")] tunnel_port: Option, - /// Provides endpoint for cloudflared metrics. Used to retrieve tunnel url - #[structopt(long = "metrics")] + /// Deprecated, no longer used. + #[structopt(hidden = true, long = "metrics")] metrics_port: Option, }, - /// Authenticate Wrangler with your Cloudflare username and password + /// Authenticate wrangler with your Cloudflare username and password #[structopt(name = "login")] Login, @@ -318,6 +356,16 @@ impl AdhocMigration { } } +fn parse_ip_address(input: &str) -> Result { + match input { + "self" => Ok(String::from("self")), + address => match IpAddr::from_str(address) { + Ok(_) => Ok(address.to_owned()), + Err(err) => anyhow::bail!("{}: {}", err, input), + }, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/cli/tail.rs b/src/cli/tail.rs index 39f394b5d..30c1141fb 100644 --- a/src/cli/tail.rs +++ b/src/cli/tail.rs @@ -1,25 +1,69 @@ use super::Cli; use crate::commands; +use crate::commands::tail::filter::*; +use crate::commands::tail::websocket::{TailFormat, TailOptions}; use crate::settings::{global_user::GlobalUser, toml::Manifest}; use anyhow::Result; +use url::Url; +#[allow(clippy::too_many_arguments)] pub fn tail( - format: String, - tunnel_port: Option, - metrics_port: Option, + name: Option, + url: Option, + format: TailFormat, + once: bool, + sampling_rate: f64, + outcomes: Vec, + methods: Vec, + headers: Vec, + client_ips: Vec, + search: Option, cli_params: &Cli, ) -> Result<()> { + let user = GlobalUser::new()?; + + // FIXME: If `name` is defined, allow the command to be run outside a `wrangler.toml` directory. let manifest = Manifest::new(&cli_params.config)?; let target = manifest.get_target(cli_params.environment.as_deref(), false)?; - let user = GlobalUser::new()?; + let account_id = target.account_id.load()?.to_string(); + let script_name = name.unwrap_or(target.name); + + let mut filters: Vec> = vec![]; + if !outcomes.is_empty() { + filters.push(Box::new(OutcomeFilter::from(outcomes))); + } + if !methods.is_empty() { + filters.push(Box::new(MethodFilter::from(methods))); + } + if !client_ips.is_empty() { + filters.push(Box::new(ClientIpFilter::from(client_ips))); + } + for header in headers.into_iter() { + filters.push(Box::new(HeaderFilter::from(header))) + } + if let Some(query) = search { + filters.push(Box::new(QueryFilter::from(query))); + }; + if sampling_rate < 1.0 && sampling_rate > 0.0 { + filters.push(Box::new(SamplingRateFilter::from(sampling_rate))); // Should always be last + }; + + let tail = commands::tail::run( + user, + account_id, + script_name, + url, + TailOptions { + once, + format, + filters, + }, + ); - commands::tail::start( - &target, - &user, - format, - tunnel_port, - metrics_port, - cli_params.verbose, - ) + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(tail) } diff --git a/src/commands/tail.rs b/src/commands/tail.rs deleted file mode 100644 index 45202d6fe..000000000 --- a/src/commands/tail.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::net::{SocketAddr, TcpListener}; - -use anyhow::Result; - -use crate::settings::global_user::GlobalUser; -use crate::settings::toml::Target; -use crate::tail::Tail; - -const DEFAULT_TUNNEL_PORT: u16 = 8080; -const DEFAULT_METRICS_PORT: u16 = 8081; - -pub fn start( - target: &Target, - user: &GlobalUser, - format: String, - tunnel_port: Option, - metrics_port: Option, - verbose: bool, -) -> Result<()> { - let tunnel_port = find_open_port(tunnel_port, DEFAULT_TUNNEL_PORT)?; - let metrics_port = find_open_port(metrics_port, DEFAULT_METRICS_PORT)?; - - Tail::run( - target.clone(), - user.clone(), - format, - tunnel_port, - metrics_port, - verbose, - ) -} - -/// Find open port takes two arguments: an Optional requested port, and a default port. -fn find_open_port(requested: Option, default: u16) -> Result { - if let Some(port) = requested { - let addr = format!("127.0.0.1:{}", port); - match TcpListener::bind(addr) { - Ok(socket) => Ok(socket.local_addr()?.port()), - Err(_) => anyhow::bail!("the specified port {} is unavailable", port), - } - } else { - // try to use the default port; else get an ephemeral port from the OS - let addrs = [ - SocketAddr::from(([127, 0, 0, 1], default)), - // Binding to port 0 effectively asks the OS to provide the next available - // ephemeral port: https://www.grc.com/port_0.htm - SocketAddr::from(([127, 0, 0, 1], 0)), - ]; - - let socket = TcpListener::bind(&addrs[..])?; - - Ok(socket.local_addr()?.port()) - } -} - -#[cfg(test)] -mod tests { - // These tests are extremely stateful; since what we are testing is how this function behaves - // when requested ports are unavailable, and since our tests run concurrently, each test uses - // unique ports to avoid collisions. There are two possible solutions to this problem: - // 1. require that these tests be run serially, and find a way to clean up bound ports - // 2. use dependency injection and write a substitute for the TcpListener::bind fn. - use super::*; - - #[test] - fn test_find_open_port_no_requested_default_available() { - let requested = None; - let default = 3000; - let port = find_open_port(requested, default).unwrap(); - - // returns default - assert_eq!(port, default); - } - - #[test] - fn test_find_open_port_no_requested_default_unavailable() { - let requested = None; - let default = 3001; - let listener = find_open_port(requested, default); - - // returns random - assert!(listener.is_ok()); - } - - #[test] - fn test_find_open_port_requested_available_default_available() { - let requested = 3002; - let default = 3003; - let port = find_open_port(Some(requested), default).unwrap(); - - // returns requested - assert_eq!(port, requested); - } - - #[test] - fn test_find_open_port_requested_available_default_unavailable() { - let requested = 3004; - let default = 3005; - let _default_listener = - TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], default))).unwrap(); - let port = find_open_port(Some(requested), default).unwrap(); - - // returns requested - assert_eq!(port, requested); - } - - #[test] - fn test_find_open_port_requested_unavailable_default_available() { - let requested = 3006; - let default = 3007; - let _requested_listener = - TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], requested))).unwrap(); - let listener = find_open_port(Some(requested), default); - - // throws error - assert!(listener.is_err()); - } - - #[test] - fn test_find_open_port_requested_unavailable_default_unavailable() { - let requested = 3008; - let default = 3009; - let _requested_listener = - TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], requested))).unwrap(); - let _default_listener = - TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], default))).unwrap(); - let listener = find_open_port(Some(requested), default); - - // throws error - assert!(listener.is_err()); - } -} diff --git a/src/commands/tail/api.rs b/src/commands/tail/api.rs new file mode 100644 index 000000000..c91577293 --- /dev/null +++ b/src/commands/tail/api.rs @@ -0,0 +1,149 @@ +use crate::http; +use crate::settings::global_user::GlobalUser; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use cloudflare::{ + endpoints::workers::{CreateTail, CreateTailParams, DeleteTail, SendTailHeartbeat}, + framework::{async_api::ApiClient, response::ApiFailure}, +}; +use reqwest::StatusCode; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time::{Duration, Instant}; +use url::Url; + +/// A tail captures `TraceEvent`s from a published Worker. +#[derive(Debug, Clone)] +pub struct Tail { + pub user: GlobalUser, + pub account_id: String, + pub script_name: String, + pub expires_at: Instant, + pub url: Option, + pub id: Option, +} + +impl Tail { + /// Sets up a new tail, but does not actually create it. + pub fn new( + user: GlobalUser, + account_id: String, + script_name: String, + url: Option, + ) -> Self { + Self { + user, + account_id, + script_name, + expires_at: Instant::now(), + url, + id: None, + } + } + + /// Tests if the tail is using WebSockets. + pub fn is_web_socket(&self) -> bool { + if let Some(url) = self.url.clone() { + return matches!(url.scheme(), "ws" | "wss"); + } + false + } + + /// Creates the tail and attaches it to a Worker. + /// + /// If successful, the tail must be kept-alive before its expiration time. + pub async fn create(&mut self) -> Result<()> { + match self.id { + None => match http::cf_v4_api_client_async(&self.user)? + .request(&CreateTail { + account_identifier: &self.account_id, + script_name: &self.script_name, + params: CreateTailParams { + url: self.url.clone().map(String::from), + }, + }) + .await + { + Ok(response) => { + let tail = response.result; + log::info!("Created tail: {:?}", tail); + self.id = Some(tail.id); + self.expires_at = to_instant(tail.expires_at); + self.url = Some(Url::parse( + &tail.url.expect("Expected a URL from tail response"), + )?); + Ok(()) + } + Err(err) => { + anyhow::bail!("Failed to create tail: {}", http::format_error(err, None)) + } + }, + _ => Ok(()), + } + } + + /// Sends a keep-alive to the tail. + pub async fn keep_alive(&mut self) -> Result<()> { + match self.id.clone() { + Some(tail_id) => { + match http::cf_v4_api_client_async(&self.user)? + .request(&SendTailHeartbeat { + account_identifier: &self.account_id, + script_name: &self.script_name, + tail_id: &tail_id, + }) + .await + { + Ok(response) => { + log::debug!("Sent tail keep-alive tail: {:?}", response.result); + self.expires_at = to_instant(response.result.expires_at); + Ok(()) + } + Err(err) => { + anyhow::bail!( + "Failed to keep-alive tail: {}", + http::format_error(err, None) + ) + } + } + } + _ => Ok(()), + } + } + + /// Deletes the tail and unattaches it from the Worker. + pub async fn delete(&mut self) -> Result<()> { + match self.id.clone() { + Some(tail_id) => match http::cf_v4_api_client_async(&self.user)? + .request(&DeleteTail { + account_identifier: &self.account_id, + script_name: &self.script_name, + tail_id: &tail_id, + }) + .await + { + Ok(_) | Err(ApiFailure::Error(StatusCode::NOT_FOUND, _)) => { + log::info!("Deleted tail: {}", &tail_id); + self.id = None; + self.url = None; + self.expires_at = Instant::now(); + Ok(()) + } + Err(err) => { + anyhow::bail!("Failed to delete tail: {}", http::format_error(err, None)) + } + }, + _ => Ok(()), + } + } +} + +/// Converts a `chrono::DateTime` into a `tokio::time::Instant`. +fn to_instant(datetime: DateTime) -> Instant { + let delta = datetime.timestamp() + - SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time is going backwards?") + .as_secs() as i64; + Instant::now() + Duration::from_secs(delta as u64) +} diff --git a/src/commands/tail/event.rs b/src/commands/tail/event.rs new file mode 100644 index 000000000..ab1374a91 --- /dev/null +++ b/src/commands/tail/event.rs @@ -0,0 +1,162 @@ +use chrono::{Local, TimeZone}; +use console::style; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::fmt::{Display, Formatter, Result}; + +/// A unique protocol ID that is passed by the `Sec-WebSocket-Protocol` header. +/// +/// It is important that this header is provided, so we can safely modify +/// the protocol schema in the future without breaking clients. +pub const PROTOCOL_ID: &str = "trace-v1"; + +/// A trace event. +/// +/// This event is fired by the Workers runtime after another event has completed. +/// Not every field is shown here, only the ones necessary for Display. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TraceEvent { + #[serde(alias = "eventTimestamp")] + pub timestamp: i64, + pub outcome: String, + pub logs: Vec, + pub exceptions: Vec, + pub event: EventItem, +} + +/// An event item. +/// +/// * If `request` is present, it's an fetch event. +/// * If `cron` is present, it's a scheduled event. +/// * Otherwise, the event type is unknown. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventItem { + pub request: Option, + pub cron: Option, +} + +/// A request item. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestItem { + pub url: String, + pub method: String, + pub cf: Option, +} + +/// Cloudflare metadata about an event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfMetadata { + pub colo: String, +} + +/// A log item. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogItem { + pub level: String, + pub message: Value, +} + +/// An error item. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExceptionItem { + pub name: String, + pub message: String, + // TODO(soon): we really need to implement stacktraces. +} + +impl Display for TraceEvent { + fn fmt(&self, f: &mut Formatter) -> Result { + let timestamp = style( + Local + .timestamp_millis(self.timestamp) + .format("%Y-%m-%d %H:%M:%S"), + ) + .dim(); + let outcome = match self.outcome.as_ref() { + "ok" => style("Ok").green(), + "canceled" => style("Canceled").yellow(), + "exception" => style("Error").red(), + "exceededCpu" => style("Exceeded Limit").red(), + _ => style("System Error").red(), + }; + match self.event.request.clone() { + Some(request) => { + let colo = style( + request + .cf + .map(|cf| cf.colo) + .unwrap_or_else(|| "?".to_owned()), + ) + .dim(); + let method = style(request.method); + let url = style(request.url).bold(); + write!( + f, + "[{}] [{}] [{}] {} {}", + timestamp, colo, outcome, method, url + ) + } + _ => match self.event.cron.clone() { + // TODO(soon): add colo to scheduled event. + Some(cron) => write!(f, "[{}] [?] [{}] {}", timestamp, outcome, cron), + _ => write!(f, "[{}] [?] [{}] ", timestamp, outcome), + }, + }?; + for log in self.logs.iter() { + let prefix = style("|").dim(); + write!(f, "\n {} {}", prefix, log)?; + } + for err in self.exceptions.iter() { + let prefix = style("!").dim(); + write!(f, "\n {} {}", prefix, err)?; + } + Ok(()) + } +} + +impl Display for LogItem { + fn fmt(&self, f: &mut Formatter) -> Result { + let level = match self.level.as_ref() { + "debug" => style("Debug").blue(), + "warn" => style("Warn").yellow(), + "error" => style("Error").red(), + _ => style("Info").dim(), + }; + write!(f, "[{}] ", level)?; + match &self.message { + // Most console.log() messages are formatted as an array. + // e.g. + // console.log('Hi') // => '["Hi"]' + // console.log('Hello', 'World') // => '["Hello","World"]' + // + // However, we want to format it nicely, similar to how it's done in DevTools. + // e.g. + // Hello World + // + // While a recursive approach might seem like a good idea, the output becomes + // suprisingly unreadable. Instead, we only handle the simple case where the + // top-level is an array and its values are strings. + Value::Array(values) => { + for value in values.iter() { + match value { + Value::String(s) => write!(f, "{}", s), + v => write!(f, "{}", v), + }?; + write!(f, " ")?; + } + Ok(()) + } + Value::String(v) => write!(f, "{}", v), + v => write!(f, "{}", v), + }?; + Ok(()) + } +} + +impl Display for ExceptionItem { + fn fmt(&self, f: &mut Formatter) -> Result { + let name = style(&self.name).red().bold(); + let message = style(&self.message).red(); + write!(f, "[{}] {}", name, message) + } +} diff --git a/src/commands/tail/filter.rs b/src/commands/tail/filter.rs new file mode 100644 index 000000000..c0d293713 --- /dev/null +++ b/src/commands/tail/filter.rs @@ -0,0 +1,115 @@ +use serde::Serialize; +use std::collections::HashSet; +use std::convert::From; + +/// A filter that accepts trace events. +/// +/// wrangler does not handle any of the filtering, +/// it only uploads them to the `WebSocketTail`. +pub trait TraceFilter: erased_serde::Serialize {} + +#[derive(Debug, Clone, Serialize)] +pub struct OutcomeFilter { + pub outcome: Vec, +} + +impl TraceFilter for OutcomeFilter {} + +impl From> for OutcomeFilter { + fn from(outcomes: Vec) -> Self { + let mut results = HashSet::new(); + for outcome in outcomes { + match outcome.as_ref() { + "ok" => results.insert("ok"), + "canceled" => results.insert("canceled"), + "error" => { + results.insert("exception"); + results.insert("exceededCpu"); + results.insert("unknown") + } + _ => false, + }; + } + Self { + outcome: results.into_iter().map(String::from).collect(), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct SamplingRateFilter { + pub sampling_rate: f64, +} + +impl TraceFilter for SamplingRateFilter {} + +impl From for SamplingRateFilter { + fn from(sampling_rate: f64) -> Self { + Self { sampling_rate } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct MethodFilter { + pub method: Vec, +} + +impl TraceFilter for MethodFilter {} + +impl From> for MethodFilter { + fn from(method: Vec) -> Self { + Self { method } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct HeaderFilter { + pub key: String, + pub query: Option, +} + +impl TraceFilter for HeaderFilter {} + +impl From for HeaderFilter { + fn from(header: String) -> Self { + match header.split_once(":") { + None => Self { + key: header, + query: None, + }, + Some((key, value)) => Self { + key: key.trim_end().to_owned(), + query: Some(value.trim_start().to_owned()), + }, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct ClientIpFilter { + pub client_ip: Vec, +} + +impl TraceFilter for ClientIpFilter {} + +impl From> for ClientIpFilter { + fn from(client_ip: Vec) -> Self { + Self { client_ip } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct QueryFilter { + pub query: String, +} + +impl TraceFilter for QueryFilter {} + +impl From for QueryFilter { + fn from(query: String) -> Self { + Self { query } + } +} + +// By default, serde::Serialize does not handle embeded traits, this fixes that. +serialize_trait_object!(TraceFilter); diff --git a/src/commands/tail/mod.rs b/src/commands/tail/mod.rs new file mode 100644 index 000000000..f311ff5f8 --- /dev/null +++ b/src/commands/tail/mod.rs @@ -0,0 +1,89 @@ +pub mod api; +/// `wrangler tail` allows Workers users to collect logs from their deployed Workers. +/// When a user runs `wrangler tail`, several things happen: +/// 1. wrangler asks the Cloudflare API to send log events to a Durable Object, +/// which will act as a log forwarder. The API returns the URL of the log forwarder. +/// 2. wrangler connects to the log forwarder using a WebSocket. +/// 3. Upon receipt of messages, wrangler prints log events to stdout. +pub mod event; +pub mod filter; +pub mod websocket; + +use crate::settings::global_user::GlobalUser; +use crate::terminal::styles; + +use api::Tail; +use websocket::{TailOptions, WebSocketTail}; + +use anyhow::Result; +use indicatif::{ProgressBar, ProgressStyle}; +use url::Url; + +/// Runs a tail session from creation to deletion. +/// +/// It can be interrupted by: +/// * an API error when creating the tail +/// * a WebSocket error when receiving events +/// * a user typing ctrl-c +/// * an expiration of the tail +/// +/// A fancy progress bar is also updated throughout the session. +pub async fn run( + user: GlobalUser, + account_id: String, + script_name: String, + url: Option, + options: TailOptions, +) -> Result<()> { + let progress = &mut ProgressBar::new_spinner() + .with_style(ProgressStyle::default_spinner().template("{spinner} {msg}")); + progress.enable_steady_tick(20); + progress.set_message("Creating tail..."); + + let tail = &mut Tail::new(user, account_id, script_name, url); + tail.create().await?; + + if tail.is_web_socket() { + progress.set_message("Connecting to tail..."); + + match &mut WebSocketTail::connect(tail.clone(), options).await { + Ok(websocket) => { + progress.abandon_with_message(&format!( + "Connected! Streaming logs from {}... (ctrl-c to quit)", + styles::bold(&tail.script_name) + )); + + if let Err(err) = websocket.update().await { + log::warn!("{}", err); + }; + if let Err(err) = websocket.read().await { + log::warn!("{}", err); + } + } + Err(err) => progress.abandon_with_message(&format!("{}", err)), + } + } else { + progress.set_message(&format!( + "Forwarding logs from {} to {} (ctrl-c to quit)", + styles::bold(&tail.script_name), + styles::url( + tail.url + .clone() + .map(String::from) + .unwrap_or_else(|| "an endpoint".to_owned()) + ) + )); + + if let Err(err) = loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => break Ok(()), + _ = tokio::time::sleep_until(tail.expires_at) => if let Err(err) = tail.keep_alive().await { break Err(err) } + } + } { + progress.abandon_with_message(&format!("{}", err)); + } + } + + tail.delete().await?; + Ok(()) +} diff --git a/src/commands/tail/websocket.rs b/src/commands/tail/websocket.rs new file mode 100644 index 000000000..58257ede6 --- /dev/null +++ b/src/commands/tail/websocket.rs @@ -0,0 +1,182 @@ +use crate::http::feature::user_agent; + +use super::api::Tail; +use super::event::{TraceEvent, PROTOCOL_ID}; +use super::filter::TraceFilter; + +use anyhow::Result; +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; +use std::str::FromStr; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::error::Error::{AlreadyClosed, ConnectionClosed}; +use tokio_tungstenite::tungstenite::handshake::client::Request; +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; +use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; + +/// The format to print a `TraceEvent`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TailFormat { + Json, + Pretty, +} + +impl FromStr for TailFormat { + type Err = anyhow::Error; + fn from_str(string: &str) -> Result { + match string { + "pretty" => Ok(TailFormat::Pretty), + "json" => Ok(TailFormat::Json), + _ => Ok(TailFormat::Json), + } + } +} + +/// Options that are sent to the `WebSocketTail`. +#[derive(Serialize)] +pub struct TailOptions { + #[serde(skip_serializing)] + pub once: bool, + #[serde(skip_serializing)] + pub format: TailFormat, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub filters: Vec>, +} + +/// A tail that sends `TraceEvent`s to a WebSocket. +pub struct WebSocketTail { + pub tail: Tail, + pub options: TailOptions, + pub websocket: WebSocketStream>, + pub closed: bool, +} + +impl WebSocketTail { + /// Connects to WebSocket tail. + pub async fn connect(tail: Tail, options: TailOptions) -> Result { + if tail.id.is_none() && tail.url.is_none() && !tail.is_web_socket() { + anyhow::bail!("Precondition failed for WebSocket tail: {:?}", &tail); + } + let request = Request::builder() + .uri(&tail.url.clone().map(String::from).unwrap()) + .header("User-Agent", user_agent()) + .header("Sec-WebSocket-Protocol", PROTOCOL_ID) + .body(())?; + log::info!("Connecting to WebSocket tail: {:?}", request); + match tokio_tungstenite::connect_async(request).await { + Ok((websocket, _)) => Ok(Self { + tail, + options, + websocket, + closed: false, + }), + Err(err) => anyhow::bail!("Failed to create WebSocket tail: {}", err), + } + } + + /// Reads a message from the WebSocket and prints it. + pub async fn read_once(&mut self) -> Result<()> { + tokio::select! { + frame = self.websocket.next() => { + match frame { + Some(Ok(message)) if message.is_text() || message.is_binary() => { + match self.options.format { + TailFormat::Json => { + println!("{}", message); + Ok(()) + }, + TailFormat::Pretty => match serde_json::from_str::(&message.to_string()) { + Ok(event) => { + println!("{}", event); + Ok(()) + }, + Err(err) => { + log::debug!("Failed to pretty-print tail: {}", err); + self.close(CloseCode::Protocol, "wrangler is closing due to a protocol violation").await + }, + } + } + }, + Some(Ok(message)) if message.is_close() => { + anyhow::bail!("Received close from WebSocket tail: {}", message) + }, + Some(Err(err)) => { + log::debug!("Received error from WebSocket tail: {}", err); + self.close(CloseCode::Abnormal, "wrangler is closing due to an error").await + }, + _ => Ok(()), + } + }, + _ = tokio::signal::ctrl_c() => { + self.close(CloseCode::Away, "wrangler is closing due to ctrl-c").await + }, + _ = tokio::time::sleep_until(self.tail.expires_at) => { + self.close(CloseCode::Normal, "wrangler is closing due to expiration").await + } + } + } + + /// Reads and prints messages from the WebSocket in a loop. + pub async fn read(&mut self) -> Result<()> { + loop { + if self.closed { + break Ok(()); + } + match self.read_once().await { + Err(err) => break Err(err), + Ok(_) if self.options.once => { + break self + .close( + CloseCode::Normal, + "wrangler is closing after receiving first log", + ) + .await + } + _ => {} + }; + } + } + + /// Writes a text message to the WebSocket. + pub async fn write(&mut self, message: String) -> Result<()> { + log::debug!("Sending message to WebSocket tail: {}", message); + match self.websocket.send(Message::Text(message)).await { + Err(err) => anyhow::bail!("Failed to write to WebSocket tail: {}", err), + _ => Ok(()), + } + } + + /// Sends the tail filters to the WebSocket. + pub async fn update(&mut self) -> Result<()> { + match self.options.filters.is_empty() { + false => match serde_json::to_string(&self.options) { + Ok(options) => self.write(options).await, + Err(err) => anyhow::bail!("Failed to deserialize options: {}", err), + }, + true => Ok(()), + } + } + + /// Closes the WebSocket. + pub async fn close(&mut self, code: CloseCode, reason: &str) -> Result<()> { + if self.closed { + return Ok(()); + } else { + self.closed = true; + } + let frame = CloseFrame { + code, + reason: Cow::Borrowed(reason), + }; + match self.websocket.close(Some(frame)).await { + Ok(_) => { + log::info!("Closed WebSocket tail: {}", reason); + Ok(()) + } + Err(AlreadyClosed | ConnectionClosed) => Ok(()), + Err(err) => anyhow::bail!("Failed to close WebSocket tail: {}", err), + } + } +} diff --git a/src/http/cf.rs b/src/http/cf.rs index c2225c445..235d5f2ce 100644 --- a/src/http/cf.rs +++ b/src/http/cf.rs @@ -25,10 +25,12 @@ pub fn cf_v4_client(user: &GlobalUser) -> Result { ) } -pub fn cf_v4_api_client_async( - user: &GlobalUser, - config: HttpApiClientConfig, -) -> Result { +pub fn cf_v4_api_client_async(user: &GlobalUser) -> Result { + let config = HttpApiClientConfig { + http_timeout: Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECONDS), + default_headers: headers(None), + }; + async_api::Client::new( Credentials::from(user.to_owned()), config, diff --git a/src/http/feature.rs b/src/http/feature.rs index 19e652a8a..870a0e28c 100644 --- a/src/http/feature.rs +++ b/src/http/feature.rs @@ -31,3 +31,7 @@ fn get_user_agent(feature: Option) -> String { } agent } + +pub fn user_agent() -> String { + get_user_agent(None) +} diff --git a/src/lib.rs b/src/lib.rs index c1df1b8c3..2b1c6ac20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,9 @@ #[macro_use] extern crate text_io; +#[macro_use] +extern crate erased_serde; + use cloudflare::framework::response::ApiErrors; mod build; @@ -20,7 +23,6 @@ pub mod login; pub mod reporter; pub mod settings; pub mod sites; -pub mod tail; pub mod terminal; pub mod upload; pub mod version; diff --git a/src/main.rs b/src/main.rs index fea787d82..18c1e2078 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,9 @@ use anyhow::Result; use structopt::StructOpt; fn main() -> Result<()> { - reporter::init(); + if !cfg!(debug_assertions) { + reporter::init(); + } env_logger::init(); let latest_version_receiver = background_check_for_updates(); @@ -105,10 +107,30 @@ fn run() -> Result<()> { Command::KvKey(key) => exec::kv_key(key, &cli_params), Command::KvBulk(bulk) => exec::kv_bulk(bulk, &cli_params), Command::Tail { + name, + url, + format, + once, + sampling_rate, + status, + method, + header, + ip_address, + search, + .. + } => exec::tail( + name, + url, format, - tunnel_port, - metrics_port, - } => exec::tail(format, tunnel_port, metrics_port, &cli_params), + once, + sampling_rate, + status, + method, + header, + ip_address, + search, + &cli_params, + ), Command::Login => commands::login::run(), Command::Report { log } => commands::report::run(log.as_deref()).map(|_| { eprintln!("Report submission sucessful. Thank you!"); diff --git a/src/tail/log_server.rs b/src/tail/log_server.rs deleted file mode 100644 index c79c52777..000000000 --- a/src/tail/log_server.rs +++ /dev/null @@ -1,198 +0,0 @@ -use crate::terminal::{colored_json_string, emoji, styles}; -use anyhow::Result; -use hyper::server::conn::AddrIncoming; -use hyper::server::Builder; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use serde::{Deserialize, Serialize}; -use std::convert::TryInto; -use tokio::sync::oneshot::Receiver; - -pub struct LogServer { - server: Builder, - shutdown_rx: Receiver<()>, - format: String, -} - -/// LogServer is just a basic HTTP server running locally; it listens for POST requests on the root -/// path and simply prints the JSON body of each request as its own line to STDOUT. -impl LogServer { - pub fn new(port: u16, shutdown_rx: Receiver<()>, format: String) -> LogServer { - // Start HTTP echo server that prints whatever is posted to it. - let addr = ([127, 0, 0, 1], port).into(); - - let server = Server::bind(&addr); - - LogServer { - server, - shutdown_rx, - format, - } - } - - pub async fn run(self) -> Result<()> { - let format = self.format; - - let server_fn_gen = |format: String| { - service_fn(move |req: Request| { - let format = format.clone(); - print_logs(req, format) - }) - }; - - let service = make_service_fn(move |_| { - let format = format.clone(); - async move { Ok::<_, hyper::Error>(server_fn_gen(format)) } - }); - - let server = self.server.serve(service); - - // The shutdown receiver listens for a one shot message from our sigint handler as a signal - // to gracefully shut down the hyper server. - let shutdown_rx = self.shutdown_rx; - - let graceful = server.with_graceful_shutdown(async { - shutdown_rx.await.ok(); - }); - - graceful.await?; - - Ok(()) - } -} - -async fn print_logs(req: Request, format: String) -> Result> { - match format.as_str() { - "pretty" => print_logs_pretty(req).await, - "json" => print_logs_json(req).await, - _ => unreachable!(), - } -} - -async fn print_logs_json(req: Request) -> Result> { - match (req.method(), req.uri().path()) { - (&Method::POST, "/") => { - let whole_body = hyper::body::to_bytes(req.into_body()).await?; - println!( - "{}", - std::str::from_utf8(&whole_body).expect("failed to deserialize tail log body") - ); - - Ok(Response::new(Body::from("Success"))) - } - _ => { - let mut bad_request = Response::default(); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - Ok(bad_request) - } - } -} - -async fn print_logs_pretty(req: Request) -> Result> { - match (req.method(), req.uri().path()) { - (&Method::POST, "/") => { - let whole_body = hyper::body::to_bytes(req.into_body()).await?; - - let parsed = serde_json::from_slice::(&whole_body).map_err(|e| { - println!("{}", styles::warning("Error parsing response body!")); - println!( - "This is not a problem with your worker, it's a problem with Wrangler.\nPlease file an issue on our GitHub page, with a minimal reproducible example of\nthe script that caused this error and a description of what happened." - ); - e - })?; - - let secs = (parsed.event_timestamp / 1000).try_into().unwrap(); - - let timestamp = chrono::NaiveDateTime::from_timestamp(secs, 0); - - println!( - "{}{} {} --> {} @ {} UTC", - emoji::EYES, - parsed.event.request.method, - styles::url(parsed.event.request.url), - parsed.outcome.to_uppercase(), - timestamp.time() - ); - - if !parsed.exceptions.is_empty() { - println!(" Exceptions:"); - parsed.exceptions.iter().for_each(|exception| { - println!( - "\t{} {}", - emoji::X, - styles::warning(format!("{}: {}", exception.name, exception.message)) - ); - }); - } - - if !parsed.logs.is_empty() { - println!(" Logs:"); - parsed.logs.iter().for_each(|log| { - let message = colored_json_string(&log.message); - let messages = if let Ok(m) = message { - m - } else { - "Error: Failed to convert encoded message to string".to_string() - }; - - let output = match log.level.as_str() { - "assert" | "error" => format!("{} {}", emoji::X, styles::warning(messages)), - "warn" => format!("{} {}", emoji::WARN, styles::highlight(messages)), - "trace" | "debug" => { - format!("{}{}", emoji::MICROSCOPE, styles::cyan(messages)) - } - _ => format!("{} {}", emoji::FILES, styles::bold(messages)), - }; - - println!("\t{}", output); - }); - } - - Ok(Response::new(Body::from("Success"))) - } - _ => { - let mut bad_request = Response::default(); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - Ok(bad_request) - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct LogResponse { - outcome: String, - script_name: Option, - // todo: wtf gets served here - exceptions: Vec, - logs: Vec, - event_timestamp: usize, - event: RequestEvent, -} - -#[derive(Debug, Serialize, Deserialize)] -struct LogException { - name: String, - message: String, - timestamp: usize, -} - -#[derive(Debug, Serialize, Deserialize)] -struct LogMessage { - message: serde_json::Value, - level: String, - timestamp: usize, -} - -#[derive(Debug, Serialize, Deserialize)] -struct RequestEvent { - request: RequestEventData, -} - -#[derive(Debug, Serialize, Deserialize)] -struct RequestEventData { - url: String, - method: String, - // headers: bruh, - // cf: RequestEventCfData, // lol -} diff --git a/src/tail/mod.rs b/src/tail/mod.rs deleted file mode 100644 index 1aebc099a..000000000 --- a/src/tail/mod.rs +++ /dev/null @@ -1,120 +0,0 @@ -/// `wrangler tail` allows Workers users to collect logs from their deployed Workers. -/// When a user runs `wrangler tail`, several things happen: -/// 1. A simple HTTP server (LogServer) is started and begins listening for requests on localhost:8080 -/// 2. An [Argo Tunnel](https://developers.cloudflare.com/argo-tunnel/) instance (Tunnel) is started -/// using [cloudflared](https://developers.cloudflare.com/argo-tunnel/downloads/), exposing the -/// LogServer to the internet on a randomly generated URL. -/// 3. Wrangler initiates a tail Session by making a request to the Workers API /tail endpoint, -/// providing the Tunnel URL as an argument. -/// 4. The Workers API binds the URL to a [Trace Worker], and directs all `console` and -/// exception logging to the Trace Worker, which POSTs each batch of logs as a JSON -/// payload to the provided Tunnel URL. -/// 5. Upon receipt, the LogServer prints the payload of each POST request to STDOUT. -mod log_server; -mod session; -mod shutdown; -mod tunnel; - -use log_server::LogServer; -use session::Session; -use shutdown::ShutdownHandler; -use tunnel::Tunnel; - -use anyhow::Result; -use console::style; -use tokio::runtime::Runtime as TokioRuntime; -use which::which; - -use crate::settings::global_user::GlobalUser; -use crate::settings::toml::Target; -use crate::terminal::emoji; - -pub struct Tail; - -impl Tail { - pub fn run( - target: Target, - user: GlobalUser, - format: String, - tunnel_port: u16, - metrics_port: u16, - verbose: bool, - ) -> Result<()> { - is_cloudflared_installed()?; - print_startup_message(&target.name, tunnel_port, metrics_port); - - // Loading the token creates a nested runtime because it goes through reqwest. - // Make sure it's loaded before creating our own runtime; nested runtimes will panic. - target.account_id.load()?; - - let runtime = TokioRuntime::new()?; - - runtime.block_on(async { - // Create three [one-shot](https://docs.rs/tokio/0.2.16/tokio/sync#oneshot-channel) - // channels for handling ctrl-c. Each channel has two parts: - // tx: Transmitter - // rx: Receiver - let (tx, rx) = tokio::sync::oneshot::channel(); // shutdown short circuit - let mut shutdown_handler = ShutdownHandler::new(); - let log_rx = shutdown_handler.subscribe(); - let session_rx = shutdown_handler.subscribe(); - let tunnel_rx = shutdown_handler.subscribe(); - - let listener = tokio::spawn(shutdown_handler.run(rx)); - - // Spin up a local http server to receive logs - let log_server = tokio::spawn(LogServer::new(tunnel_port, log_rx, format).run()); - - // Spin up a new cloudflared tunnel to connect trace worker to local server - let tunnel_process = Tunnel::new(tunnel_port, metrics_port, verbose)?; - let tunnel = tokio::spawn(tunnel_process.run(tunnel_rx)); - - // Register the tail with the Workers API and send periodic heartbeats - let session = tokio::spawn(Session::run( - target, - user, - session_rx, - tx, - metrics_port, - verbose, - )); - - let res = tokio::try_join!( - async { listener.await? }, - async { log_server.await? }, - async { session.await? }, - async { tunnel.await? } - ); - - match res { - Ok(_) => Ok(()), - Err(e) => Err(e), - } - }) - } -} - -fn is_cloudflared_installed() -> Result<()> { - // this can be removed once we automatically install cloudflared - if which("cloudflared").is_err() { - let install_url = style("https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup/installation") - .blue() - .bold(); - anyhow::bail!("You must install cloudflared to use wrangler tail.\n\nInstallation instructions can be found here:\n{}", install_url); - } else { - Ok(()) - } -} - -fn print_startup_message(worker_name: &str, tunnel_port: u16, metrics_port: u16) { - // Note that we use eprintln!() throughout this module; this is because we want any - // helpful output to not be mixed with actual log JSON output, so we use this macro - // to print messages to stderr instead of stdout (where log output is printed). - eprintln!( - "{} Setting up log streaming from Worker script \"{}\". Using ports {} and {}.", - emoji::TAIL, - worker_name, - tunnel_port, - metrics_port, - ); -} diff --git a/src/tail/session.rs b/src/tail/session.rs deleted file mode 100644 index 495d9febd..000000000 --- a/src/tail/session.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::str; -use std::time::Duration; - -use anyhow::{anyhow, bail, Result}; -use regex::Regex; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::time::sleep; - -use cloudflare::endpoints::workers::{CreateTail, CreateTailParams, DeleteTail, SendTailHeartbeat}; -use cloudflare::framework::HttpApiClientConfig; -use cloudflare::framework::{async_api, async_api::ApiClient}; - -use crate::http; -use crate::settings::global_user::GlobalUser; -use crate::settings::toml::Target; - -const KEEP_ALIVE_INTERVAL: u64 = 60; - -pub struct Session { - tail_id: Option, -} - -/// Session is responsible for interacting with the Workers API to establish and maintain the tail -/// connection; once the tail session is established, it uses the returned tail_id to send periodic -/// "keepalive" heartbeats to the service. If the service does not receive a heartbeat for ~10 -/// minutes, it will kill the tail session by removing the trace worker. -impl Session { - pub async fn run( - target: Target, - user: GlobalUser, - shutdown_rx: Receiver<()>, - tx: Sender<()>, - metrics_port: u16, - verbose: bool, - ) -> Result<()> { - // During the start process we'll populate the tail with the response from the API. - let mut session = Session { tail_id: None }; - // We need to exit on a shutdown command without waiting for API calls to complete. - tokio::select! { - _ = shutdown_rx => { session.close(&user, &target).await } - result = session.start(&target, &user, tx, metrics_port, verbose) => { result } - } - } - - async fn close(&self, user: &GlobalUser, target: &Target) -> Result<()> { - // The API will clean up tails after about 10 minutes of inactivity, or 24 hours of - // activity but since we limit the number of tails allowed on a single script we should at - // least try to delete them as we go. - if let Some(tail_id) = &self.tail_id { - let client = http::cf_v4_api_client_async(user, HttpApiClientConfig::default())?; - client - .request(&DeleteTail { - account_identifier: target.account_id.load()?, - script_name: &target.name, - tail_id, - }) - .await?; - } - - Ok(()) - } - - async fn start( - &mut self, - target: &Target, - user: &GlobalUser, - tx: Sender<()>, - metrics_port: u16, - _verbose: bool, - ) -> Result<()> { - eprintln!("This may take a few seconds..."); - - let client = http::cf_v4_api_client_async(user, HttpApiClientConfig::default())?; - - // Hit http://localhost:8081/metrics to grab the generated tunnel name. cloudflared - // can take a minute to start, so use exponential backoff - let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100).take(5); - let result = - tokio_retry::Retry::spawn(retry_strategy, || get_tunnel_url(metrics_port)).await; - - if let Ok(url_) = result { - let url = Some(url_); - let response = client - .request(&CreateTail { - account_identifier: target.account_id.load()?, - script_name: &target.name, - params: CreateTailParams { url }, - }) - .await; - - match response { - Ok(success) => { - eprintln!("Now prepared to stream logs"); - - let tail_id = success.result.id; - self.tail_id = Some(tail_id.clone()); - - // Loop indefinitely to send "heartbeat" to API and keep log streaming alive. - // This should loop forever until SIGINT is issued or Wrangler process is killed - // through other means. - let duration = Duration::from_millis(1000 * KEEP_ALIVE_INTERVAL); - let mut delay = sleep(duration); - - loop { - delay.await; - let heartbeat_result = send_heartbeat(target, &client, &tail_id).await; - if heartbeat_result.is_err() { - return heartbeat_result; - } - delay = sleep(duration); - } - } - Err(e) => { - tx.send(()).unwrap(); - bail!(http::format_error(e, Some(&tail_help))) - } - } - } else { - bail!("Could not extract tunnel url from cloudflared"); - } - } -} - -async fn get_tunnel_url(metrics_port: u16) -> Result { - let metrics_url = format!("http://localhost:{}/metrics", metrics_port); - let url_regex = Regex::new("userHostname=\"(https://[a-z.-]+)\"").unwrap(); - let body = reqwest::get(metrics_url).await?.text().await?; - match url_regex - .captures(&body) - .and_then(|captures| captures.get(1)) - { - Some(url) => Ok(url.as_str().to_string()), - None => Err(anyhow!("Failed to extract capture group!")), - } -} - -async fn send_heartbeat(target: &Target, client: &async_api::Client, tail_id: &str) -> Result<()> { - let response = client - .request(&SendTailHeartbeat { - account_identifier: target.account_id.load()?, - script_name: &target.name, - tail_id, - }) - .await; - - match response { - Ok(_) => Ok(()), - Err(e) => Err(anyhow!(http::format_error(e, Some(&tail_help)))), - } -} - -// tail_help() provides more detailed explanations of Workers KV API error codes. -// See https://api.cloudflare.com/#workers-kv-namespace-errors for details. -fn tail_help(error_code: u16) -> &'static str { - match error_code { - 7003 | 7000 => { - "Your configuration file is likely missing the field \"account_id\", which is required to tail a worker." - } - // unauthorized - 10000 => { - "Make sure your API token has permission to both edit and read workers on your account" - } - // script not found - 10007 => "wrangler can only tail live Worker scripts. Run `wrangler publish` before attempting to tail.", // key errors - // limit errors - 10057 | 10058 | 10059 => "See documentation", - _ => "", - } -} diff --git a/src/tail/shutdown.rs b/src/tail/shutdown.rs deleted file mode 100644 index c7d27ec36..000000000 --- a/src/tail/shutdown.rs +++ /dev/null @@ -1,38 +0,0 @@ -use anyhow::Result; -use tokio::sync::oneshot::{channel, Receiver, Sender}; - -pub struct ShutdownHandler { - txs: Vec>, -} - -impl ShutdownHandler { - pub fn new() -> ShutdownHandler { - ShutdownHandler { txs: Vec::new() } - } - - pub fn subscribe(&mut self) -> Receiver<()> { - let (tx, rx) = channel(); - self.txs.push(tx); - - rx - } - - /// ShutdownHandler waits on a ctrl_c from the system, or a short circuit command from the top - /// level error handler, and sends messages to each registered transmitter when it is received. - pub async fn run(self, short_circuit: Receiver<()>) -> Result<()> { - tokio::select! { - _ = tokio::signal::ctrl_c() => {} - _ = short_circuit => {} - } - - eprintln!("Closing tail session..."); - for tx in self.txs { - // if `tx.send()` returns an error, it is because the receiver has gone out of scope, - // likely due to the task returning early for some reason, in which case we don't need - // to tell that task to shut down because it already has. - tx.send(()).ok(); - } - - Ok(()) - } -} diff --git a/src/tail/tunnel.rs b/src/tail/tunnel.rs deleted file mode 100644 index 82c87cc25..000000000 --- a/src/tail/tunnel.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::process::Stdio; -use std::str; - -use tokio::process::Child; -use tokio::process::Command; -use tokio::sync::oneshot::Receiver; - -use anyhow::Result; - -pub struct Tunnel { - child: Child, -} - -/// Tunnel wraps a child process that runs cloudflared and forwards requests from the Trace Worker -/// in the runtime to our local LogServer instance. We wrap it in a struct primarily to hold the -/// state of the child process so that upon receipt of a SIGINT message we can more swiftly kill it -/// and wait on its output; otherwise we leave an orphaned process when wrangler exits and this -/// causes problems if it still exists the next time we start up a tail. -impl Tunnel { - pub fn new(tunnel_port: u16, metrics_port: u16, verbose: bool) -> Result { - let tool_name = PathBuf::from("cloudflared"); - // TODO: Finally get cloudflared release binaries distributed on GitHub so we could - // simply uncomment the line below. - // let binary_path = install::install(tool_name, "cloudflare")?.binary(tool_name)?; - - let tunnel_url = format!("localhost:{}", tunnel_port); - let metrics_url = format!("localhost:{}", metrics_port); - let args = ["tunnel", "--url", &tunnel_url, "--metrics", &metrics_url]; - - let mut command = command(&args, &tool_name, verbose); - let command_name = format!("{:?}", command); - - let child = command - .spawn() - .unwrap_or_else(|_| panic!("{} failed to spawn", command_name)); - - Ok(Tunnel { child }) - } - - pub async fn run(self, shutdown_rx: Receiver<()>) -> Result<()> { - shutdown_rx.await?; - self.shutdown().await - } - - /// shutdown is relatively simple, it sends a second `kill` signal to the child process, - /// short-circuiting cloudflared's "graceful shutdown" period. this approach has been endorsed - /// by the team who maintains cloudflared as safe practice. - pub async fn shutdown(mut self) -> Result<()> { - if let Err(e) = self.child.kill().await { - let msg = if let Some(pid) = self.child.id() { - format!("failed to kill cloudflared: {}\ncloudflared will eventually exit, or you can explicitly kill it by running `kill {}`", e, pid) - } else { - format!( - "failed to kill cloudflared: {}\ncloudflared will eventually exit.", - e - ) - }; - anyhow::bail!(msg) - } else { - self.child.wait_with_output().await?; - - Ok(()) - } - } -} - -// TODO: let's not clumsily copy this from commands/build/mod.rs -// We definitely want to keep the check for RUST_LOG=info below so we avoid -// spamming user terminal with default cloudflared output (which is pretty darn sizable.) -pub fn command(args: &[&str], binary_path: &Path, verbose: bool) -> Command { - let mut c = if cfg!(target_os = "windows") { - let mut c = Command::new("cmd"); - c.arg("/C"); - c.arg(binary_path); - c - } else { - Command::new(binary_path) - }; - - c.args(args); - // Let user read cloudflared process logs iff RUST_LOG=info. - if !verbose { - c.stderr(Stdio::null()); - c.stdout(Stdio::null()); - } - - c -}