From 050075a1e63a92a946600261d28be09156cd36ec Mon Sep 17 00:00:00 2001 From: FujiApple Date: Fri, 10 Nov 2023 23:43:32 +0800 Subject: [PATCH] feat: record separate traces per flow (#760) --- src/backend/trace.rs | 272 +++++++++++++++++++------------ src/frontend/render/body.rs | 3 +- src/frontend/render/chart.rs | 7 +- src/frontend/render/header.rs | 3 +- src/frontend/render/histogram.rs | 5 +- src/frontend/render/table.rs | 15 +- src/frontend/render/world.rs | 4 +- src/frontend/tui_app.rs | 14 +- src/report.rs | 12 +- 9 files changed, 206 insertions(+), 129 deletions(-) diff --git a/src/backend/trace.rs b/src/backend/trace.rs index e1aac2de1..4fd944151 100644 --- a/src/backend/trace.rs +++ b/src/backend/trace.rs @@ -1,6 +1,8 @@ -use crate::backend::flows::{Flow, FlowRegistry}; +use crate::backend::flows::{Flow, FlowId, FlowRegistry}; use crate::config::MAX_HOPS; use indexmap::IndexMap; +use std::collections::HashMap; +use std::iter::once; use std::net::{IpAddr, Ipv4Addr}; use std::time::Duration; use trippy::tracing::{Probe, ProbeStatus, TracerRound}; @@ -9,43 +11,31 @@ use trippy::tracing::{Probe, ProbeStatus, TracerRound}; #[derive(Debug, Clone)] pub struct Trace { max_samples: usize, - lowest_ttl: u8, - highest_ttl: u8, - highest_ttl_for_round: u8, - round: Option, - hops: Vec, + flows: HashMap, flow_registry: FlowRegistry, error: Option, } impl Trace { + /// Create a new `Trace`. pub fn new(max_samples: usize) -> Self { Self { + flows: once((FlowId(0), TraceFlow::new(max_samples))) + .collect::>(), max_samples, - lowest_ttl: 0, - highest_ttl: 0, - highest_ttl_for_round: 0, - round: None, - hops: (0..MAX_HOPS).map(|_| Hop::default()).collect(), flow_registry: FlowRegistry::new(), error: None, } } - /// The current round of tracing. - pub fn round(&self) -> Option { - self.round + /// Return the id of the default flow. + pub fn default_flow_id() -> FlowId { + FlowId(0) } /// Information about each hop in the trace. - pub fn hops(&self) -> &[Hop] { - if self.lowest_ttl == 0 || self.highest_ttl == 0 { - &[] - } else { - let start = (self.lowest_ttl as usize) - 1; - let end = self.highest_ttl as usize; - &self.hops[start..end] - } + pub fn hops(&self, flow_id: FlowId) -> &[Hop] { + self.flows[&flow_id].hops() } /// Is a given `Hop` the target hop? @@ -54,24 +44,27 @@ impl Trace { /// /// Note that if the target host does not respond to probes then the the highest `ttl` observed /// will be one greater than the `ttl` of the last host which did respond. - pub fn is_target(&self, hop: &Hop) -> bool { - self.highest_ttl == hop.ttl + pub fn is_target(&self, hop: &Hop, flow_id: FlowId) -> bool { + self.flows[&flow_id].is_target(hop) } /// Is a given `Hop` in the current round? - pub fn is_in_round(&self, hop: &Hop) -> bool { - hop.ttl <= self.highest_ttl_for_round + pub fn is_in_round(&self, hop: &Hop, flow_id: FlowId) -> bool { + self.flows[&flow_id].is_in_round(hop) } /// Return the target `Hop`. - /// - /// TODO Do we guarantee there is always a target hop? - pub fn target_hop(&self) -> &Hop { - if self.highest_ttl > 0 { - &self.hops[usize::from(self.highest_ttl) - 1] - } else { - &self.hops[0] - } + pub fn target_hop(&self, flow_id: FlowId) -> &Hop { + self.flows[&flow_id].target_hop() + } + + /// The current round of tracing. + pub fn round(&self, flow_id: FlowId) -> Option { + self.flows[&flow_id].round() + } + + pub fn probe_count(&self, flow_id: FlowId) -> usize { + self.flows[&flow_id].round_count() } pub fn flow_registry(&self) -> &FlowRegistry { @@ -88,75 +81,20 @@ impl Trace { /// Update the tracing state from a `TracerRound`. pub fn update_from_round(&mut self, round: &TracerRound<'_>) { - self.update_flows(round.probes); - self.highest_ttl = std::cmp::max(self.highest_ttl, round.largest_ttl.0); - self.highest_ttl_for_round = round.largest_ttl.0; - for probe in round.probes { - self.update_from_probe(probe); - } - } - - fn update_from_probe(&mut self, probe: &Probe) { - self.update_lowest_ttl(probe); - self.update_round(probe); - match probe.status { - ProbeStatus::Complete => { - let index = usize::from(probe.ttl.0) - 1; - let hop = &mut self.hops[index]; - hop.ttl = probe.ttl.0; - hop.total_sent += 1; - hop.total_recv += 1; - let dur = probe.duration(); - let dur_ms = dur.as_secs_f64() * 1000_f64; - hop.total_time += dur; - hop.last = Some(dur); - hop.samples.insert(0, dur); - hop.best = hop.best.map_or(Some(dur), |d| Some(d.min(dur))); - hop.worst = hop.worst.map_or(Some(dur), |d| Some(d.max(dur))); - hop.mean += (dur_ms - hop.mean) / hop.total_recv as f64; - hop.m2 += (dur_ms - hop.mean) * (dur_ms - hop.mean); - if hop.samples.len() > self.max_samples { - hop.samples.pop(); - } - let host = probe.host.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); - *hop.addrs.entry(host).or_default() += 1; - } - ProbeStatus::Awaited => { - let index = usize::from(probe.ttl.0) - 1; - self.hops[index].total_sent += 1; - self.hops[index].ttl = probe.ttl.0; - self.hops[index].samples.insert(0, Duration::default()); - if self.hops[index].samples.len() > self.max_samples { - self.hops[index].samples.pop(); - } - } - ProbeStatus::NotSent => {} - } + let flow = Flow::from_hops(round.probes.iter().map(|p| p.host)); + let flow_id = self.flow_registry.register(flow); + self.update_trace_flow(FlowId(0), round); + self.update_trace_flow(flow_id, round); } - /// Update `lowest_ttl` for valid probes. - fn update_lowest_ttl(&mut self, probe: &Probe) { - if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) { - if self.lowest_ttl == 0 { - self.lowest_ttl = probe.ttl.0; - } else { - self.lowest_ttl = self.lowest_ttl.min(probe.ttl.0); - } - } - } - - /// Update `round` for valid probes. - fn update_round(&mut self, probe: &Probe) { - if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) { - self.round = match self.round { - None => Some(probe.round.0), - Some(r) => Some(r.max(probe.round.0)), - } - } - } - fn update_flows(&mut self, probes: &[Probe]) { - let flow = Flow::from_hops(probes.iter().map(|p| p.host)); - self.flow_registry.register(flow); + fn update_trace_flow(&mut self, flow_id: FlowId, round: &TracerRound<'_>) { + let flow_trace = self + .flows + .entry(flow_id) + .or_insert_with(|| TraceFlow::new(self.max_samples)); + flow_trace.highest_ttl = std::cmp::max(flow_trace.highest_ttl, round.largest_ttl.0); + flow_trace.highest_ttl_for_round = round.largest_ttl.0; + flow_trace.update_from_round(round); } } @@ -272,3 +210,135 @@ impl Default for Hop { } } } + +/// Data for a single flow within a trace. +#[derive(Debug, Clone)] +struct TraceFlow { + /// The maximum number of samples to record. + max_samples: usize, + /// The lowest ttl observed across all rounds. + lowest_ttl: u8, + /// The highest ttl observed across all rounds. + highest_ttl: u8, + /// The highest ttl observed for the latest round. + highest_ttl_for_round: u8, + /// The latest round received for this flow. + round: Option, + /// The total number of rounds received for this flow. + round_count: usize, + hops: Vec, +} + +impl TraceFlow { + fn new(max_samples: usize) -> Self { + Self { + max_samples, + lowest_ttl: 0, + highest_ttl: 0, + highest_ttl_for_round: 0, + round: None, + round_count: 0, + hops: (0..MAX_HOPS).map(|_| Hop::default()).collect(), + } + } + + fn hops(&self) -> &[Hop] { + if self.lowest_ttl == 0 || self.highest_ttl == 0 { + &[] + } else { + let start = (self.lowest_ttl as usize) - 1; + let end = self.highest_ttl as usize; + &self.hops[start..end] + } + } + + fn is_target(&self, hop: &Hop) -> bool { + self.highest_ttl == hop.ttl + } + + fn is_in_round(&self, hop: &Hop) -> bool { + hop.ttl <= self.highest_ttl_for_round + } + + fn target_hop(&self) -> &Hop { + if self.highest_ttl > 0 { + &self.hops[usize::from(self.highest_ttl) - 1] + } else { + &self.hops[0] + } + } + + fn round(&self) -> Option { + self.round + } + + fn round_count(&self) -> usize { + self.round_count + } + + fn update_from_round(&mut self, round: &TracerRound<'_>) { + self.round_count += 1; + for probe in round.probes { + self.update_from_probe(probe); + } + } + + fn update_from_probe(&mut self, probe: &Probe) { + self.update_lowest_ttl(probe); + self.update_round(probe); + match probe.status { + ProbeStatus::Complete => { + let index = usize::from(probe.ttl.0) - 1; + let hop = &mut self.hops[index]; + hop.ttl = probe.ttl.0; + hop.total_sent += 1; + hop.total_recv += 1; + let dur = probe.duration(); + let dur_ms = dur.as_secs_f64() * 1000_f64; + hop.total_time += dur; + hop.last = Some(dur); + hop.samples.insert(0, dur); + hop.best = hop.best.map_or(Some(dur), |d| Some(d.min(dur))); + hop.worst = hop.worst.map_or(Some(dur), |d| Some(d.max(dur))); + hop.mean += (dur_ms - hop.mean) / hop.total_recv as f64; + hop.m2 += (dur_ms - hop.mean) * (dur_ms - hop.mean); + if hop.samples.len() > self.max_samples { + hop.samples.pop(); + } + let host = probe.host.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); + *hop.addrs.entry(host).or_default() += 1; + } + ProbeStatus::Awaited => { + let index = usize::from(probe.ttl.0) - 1; + self.hops[index].total_sent += 1; + self.hops[index].ttl = probe.ttl.0; + self.hops[index].samples.insert(0, Duration::default()); + if self.hops[index].samples.len() > self.max_samples { + self.hops[index].samples.pop(); + } + } + ProbeStatus::NotSent => {} + } + } + + /// Update `round` for valid probes. + fn update_round(&mut self, probe: &Probe) { + if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) { + self.round = match self.round { + None => Some(probe.round.0), + Some(r) => Some(r.max(probe.round.0)), + } + } + } + + /// Update `lowest_ttl` for valid probes. + fn update_lowest_ttl(&mut self, probe: &Probe) { + if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) { + if self.lowest_ttl == 0 { + self.lowest_ttl = probe.ttl.0; + } else { + self.lowest_ttl = self.lowest_ttl.min(probe.ttl.0); + } + } + } +} diff --git a/src/frontend/render/body.rs b/src/frontend/render/body.rs index 817580a10..e61295e78 100644 --- a/src/frontend/render/body.rs +++ b/src/frontend/render/body.rs @@ -1,3 +1,4 @@ +use crate::backend::trace::Trace; use crate::frontend::render::{bsod, chart, splash, table, world}; use crate::frontend::tui_app::TuiApp; use ratatui::layout::Rect; @@ -10,7 +11,7 @@ use ratatui::Frame; pub fn render(f: &mut Frame<'_>, rec: Rect, app: &mut TuiApp) { if let Some(err) = app.selected_tracer_data.error() { bsod::render(f, rec, err); - } else if app.tracer_data().hops().is_empty() { + } else if app.tracer_data().hops(Trace::default_flow_id()).is_empty() { splash::render(f, app, rec); } else if app.show_chart { chart::render(f, app, rec); diff --git a/src/frontend/render/chart.rs b/src/frontend/render/chart.rs index 745c76075..77ea3d986 100644 --- a/src/frontend/render/chart.rs +++ b/src/frontend/render/chart.rs @@ -1,3 +1,4 @@ +use crate::backend::trace::Trace; use crate::frontend::tui_app::TuiApp; use ratatui::layout::{Alignment, Constraint, Rect}; use ratatui::style::Style; @@ -9,13 +10,13 @@ use ratatui::Frame; /// Render the ping history for all hops as a chart. pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) { let selected_hop = app.table_state.selected().map_or_else( - || app.tracer_data().target_hop(), - |s| &app.tracer_data().hops()[s], + || app.tracer_data().target_hop(Trace::default_flow_id()), + |s| &app.tracer_data().hops(Trace::default_flow_id())[s], ); let samples = app.tui_config.max_samples / app.zoom_factor; let series_data = app .selected_tracer_data - .hops() + .hops(Trace::default_flow_id()) .iter() .map(|hop| { hop.samples() diff --git a/src/frontend/render/header.rs b/src/frontend/render/header.rs index 8ece2b4dc..43e207bbf 100644 --- a/src/frontend/render/header.rs +++ b/src/frontend/render/header.rs @@ -1,3 +1,4 @@ +use crate::backend::trace::Trace; use crate::frontend::tui_app::TuiApp; use chrono::SecondsFormat; use humantime::format_duration; @@ -94,7 +95,7 @@ pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) { Span::raw(render_status(app)), Span::raw(format!( ", discovered {} hops", - app.tracer_data().hops().len() + app.tracer_data().hops(Trace::default_flow_id()).len() )), ]), ]; diff --git a/src/frontend/render/histogram.rs b/src/frontend/render/histogram.rs index 26781b53a..2ad139bd1 100644 --- a/src/frontend/render/histogram.rs +++ b/src/frontend/render/histogram.rs @@ -1,3 +1,4 @@ +use crate::backend::trace::Trace; use crate::frontend::tui_app::TuiApp; use ratatui::layout::Rect; use ratatui::style::{Modifier, Style}; @@ -9,8 +10,8 @@ use std::time::Duration; /// Render a histogram of ping frequencies. pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) { let target_hop = app.table_state.selected().map_or_else( - || app.tracer_data().target_hop(), - |s| &app.tracer_data().hops()[s], + || app.tracer_data().target_hop(Trace::default_flow_id()), + |s| &app.tracer_data().hops(Trace::default_flow_id())[s], ); let freq_data = sample_frequency(target_hop.samples()); let freq_data_ref: Vec<_> = freq_data.iter().map(|(b, c)| (b.as_str(), *c)).collect(); diff --git a/src/frontend/render/table.rs b/src/frontend/render/table.rs index 56ec531e4..5bae56f5a 100644 --- a/src/frontend/render/table.rs +++ b/src/frontend/render/table.rs @@ -1,4 +1,4 @@ -use crate::backend::trace::Hop; +use crate::backend::trace::{Hop, Trace}; use crate::config::{AddressMode, AsMode, GeoIpMode}; use crate::frontend::config::TuiConfig; use crate::frontend::theme::Theme; @@ -31,10 +31,11 @@ use trippy::dns::{AsInfo, DnsEntry, DnsResolver, Resolved, Resolver, Unresolved} pub fn render(f: &mut Frame<'_>, app: &mut TuiApp, rect: Rect) { let header = render_table_header(app.tui_config.theme); let selected_style = Style::default().add_modifier(Modifier::REVERSED); - let rows = - app.tracer_data().hops().iter().map(|hop| { - render_table_row(app, hop, &app.resolver, &app.geoip_lookup, &app.tui_config) - }); + let rows = app + .tracer_data() + .hops(Trace::default_flow_id()) + .iter() + .map(|hop| render_table_row(app, hop, &app.resolver, &app.geoip_lookup, &app.tui_config)); let table = Table::new(rows) .header(header) .block( @@ -77,8 +78,8 @@ fn render_table_row( .selected_hop() .map(|h| h.ttl() == hop.ttl()) .unwrap_or_default(); - let is_target = app.tracer_data().is_target(hop); - let is_in_round = app.tracer_data().is_in_round(hop); + let is_target = app.tracer_data().is_target(hop, Trace::default_flow_id()); + let is_in_round = app.tracer_data().is_in_round(hop, Trace::default_flow_id()); let ttl_cell = render_ttl_cell(hop); let (hostname_cell, row_height) = if is_selected_hop && app.show_hop_details { render_hostname_with_details(app, hop, dns, geoip_lookup, config) diff --git a/src/frontend/render/world.rs b/src/frontend/render/world.rs index dbc1b3e12..97a3fdffa 100644 --- a/src/frontend/render/world.rs +++ b/src/frontend/render/world.rs @@ -1,4 +1,4 @@ -use crate::backend::trace::Hop; +use crate::backend::trace::{Hop, Trace}; use crate::frontend::tui_app::TuiApp; use itertools::Itertools; use ratatui::layout::{Alignment, Constraint, Direction, Layout, Margin, Rect}; @@ -186,7 +186,7 @@ struct MapEntry { /// Each entry represent a single `GeoIp` location, which may be associated with multiple hops. fn build_map_entries(app: &TuiApp) -> Vec { let mut geo_map: HashMap = HashMap::new(); - for hop in app.tracer_data().hops() { + for hop in app.tracer_data().hops(Trace::default_flow_id()) { for addr in hop.addrs() { if let Some(geo) = app.geoip_lookup.lookup(*addr).unwrap_or_default() { if let Some((latitude, longitude, radius)) = geo.coordinates() { diff --git a/src/frontend/tui_app.rs b/src/frontend/tui_app.rs index 569edc739..a081d1821 100644 --- a/src/frontend/tui_app.rs +++ b/src/frontend/tui_app.rs @@ -78,15 +78,15 @@ impl TuiApp { pub fn selected_hop_or_target(&self) -> &Hop { self.table_state.selected().map_or_else( - || self.tracer_data().target_hop(), - |s| &self.tracer_data().hops()[s], + || self.tracer_data().target_hop(Trace::default_flow_id()), + |s| &self.tracer_data().hops(Trace::default_flow_id())[s], ) } pub fn selected_hop(&self) -> Option<&Hop> { self.table_state .selected() - .map(|s| &self.tracer_data().hops()[s]) + .map(|s| &self.tracer_data().hops(Trace::default_flow_id())[s]) } pub fn tracer_config(&self) -> &TraceInfo { @@ -94,7 +94,7 @@ impl TuiApp { } pub fn clamp_selected_hop(&mut self) { - let hop_count = self.tracer_data().hops().len(); + let hop_count = self.tracer_data().hops(Trace::default_flow_id()).len(); if let Some(selected) = self.table_state.selected() { if selected > hop_count - 1 { self.table_state.select(Some(hop_count - 1)); @@ -103,7 +103,7 @@ impl TuiApp { } pub fn next_hop(&mut self) { - let hop_count = self.tracer_data().hops().len(); + let hop_count = self.tracer_data().hops(Trace::default_flow_id()).len(); if hop_count == 0 { return; } @@ -123,7 +123,7 @@ impl TuiApp { } pub fn previous_hop(&mut self) { - let hop_count = self.tracer_data().hops().len(); + let hop_count = self.tracer_data().hops(Trace::default_flow_id()).len(); if hop_count == 0 { return; } @@ -299,7 +299,7 @@ impl TuiApp { /// The maximum number of hosts per hop for the currently selected trace. pub fn max_hosts(&self) -> u8 { self.selected_tracer_data - .hops() + .hops(Trace::default_flow_id()) .iter() .map(|h| h.addrs().count()) .max() diff --git a/src/report.rs b/src/report.rs index b7902482f..4e2a85771 100644 --- a/src/report.rs +++ b/src/report.rs @@ -19,7 +19,7 @@ pub fn run_report_csv( ) -> anyhow::Result<()> { let trace = wait_for_round(&info.data, report_cycles)?; println!("Target,TargetIp,Hop,IPs,Addrs,Loss%,Snt,Recv,Last,Avg,Best,Wrst,StdDev,"); - for hop in trace.hops() { + for hop in trace.hops(Trace::default_flow_id()) { let ttl = hop.ttl(); let ips = hop.addrs().join(":"); let ip = if ips.is_empty() { @@ -120,7 +120,7 @@ pub fn run_report_json( ) -> anyhow::Result<()> { let trace = wait_for_round(&info.data, report_cycles)?; let hops: Vec = trace - .hops() + .hops(Trace::default_flow_id()) .iter() .map(|hop| { let hosts: Vec<_> = hop @@ -191,7 +191,7 @@ fn run_report_table( .load_preset(preset) .set_content_arrangement(ContentArrangement::Dynamic) .set_header(columns); - for hop in trace.hops() { + for hop in trace.hops(Trace::default_flow_id()) { let ttl = hop.ttl().to_string(); let ips = hop.addrs().join("\n"); let ip = if ips.is_empty() { @@ -238,7 +238,7 @@ pub fn run_report_stream(info: &TraceInfo) -> anyhow::Result<()> { if let Some(err) = trace_data.error() { return Err(anyhow!("error: {}", err)); } - for hop in trace_data.hops() { + for hop in trace_data.hops(Trace::default_flow_id()) { let ttl = hop.ttl(); let addrs = hop.addrs().collect::>(); let sent = hop.total_sent(); @@ -275,7 +275,9 @@ pub fn run_report_silent(info: &TraceInfo, report_cycles: usize) -> anyhow::Resu /// Block until trace data for round `round` is available. fn wait_for_round(trace_data: &Arc>, report_cycles: usize) -> anyhow::Result { let mut trace = trace_data.read().clone(); - while trace.round().is_none() || trace.round() < Some(report_cycles - 1) { + while trace.round(Trace::default_flow_id()).is_none() + || trace.round(Trace::default_flow_id()) < Some(report_cycles - 1) + { trace = trace_data.read().clone(); if let Some(err) = trace.error() { return Err(anyhow!("error: {}", err));