Skip to content

Commit

Permalink
feat: record separate traces per flow (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiapple852 committed Nov 11, 2023
1 parent 0af9118 commit 050075a
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 129 deletions.
272 changes: 171 additions & 101 deletions src/backend/trace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::backend::flows::{Flow, FlowRegistry};
use crate::backend::flows::{Flow, FlowId, FlowRegistry};
use crate::config::MAX_HOPS;
use indexmap::IndexMap;
use std::collections::HashMap;
use std::iter::once;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
use trippy::tracing::{Probe, ProbeStatus, TracerRound};
Expand All @@ -9,43 +11,31 @@ use trippy::tracing::{Probe, ProbeStatus, TracerRound};
#[derive(Debug, Clone)]
pub struct Trace {
max_samples: usize,
lowest_ttl: u8,
highest_ttl: u8,
highest_ttl_for_round: u8,
round: Option<usize>,
hops: Vec<Hop>,
flows: HashMap<FlowId, TraceFlow>,
flow_registry: FlowRegistry,
error: Option<String>,
}

impl Trace {
/// Create a new `Trace`.
pub fn new(max_samples: usize) -> Self {
Self {
flows: once((FlowId(0), TraceFlow::new(max_samples)))
.collect::<HashMap<FlowId, TraceFlow>>(),
max_samples,
lowest_ttl: 0,
highest_ttl: 0,
highest_ttl_for_round: 0,
round: None,
hops: (0..MAX_HOPS).map(|_| Hop::default()).collect(),
flow_registry: FlowRegistry::new(),
error: None,
}
}

/// The current round of tracing.
pub fn round(&self) -> Option<usize> {
self.round
/// Return the id of the default flow.
pub fn default_flow_id() -> FlowId {
FlowId(0)
}

/// Information about each hop in the trace.
pub fn hops(&self) -> &[Hop] {
if self.lowest_ttl == 0 || self.highest_ttl == 0 {
&[]
} else {
let start = (self.lowest_ttl as usize) - 1;
let end = self.highest_ttl as usize;
&self.hops[start..end]
}
pub fn hops(&self, flow_id: FlowId) -> &[Hop] {
self.flows[&flow_id].hops()
}

/// Is a given `Hop` the target hop?
Expand All @@ -54,24 +44,27 @@ impl Trace {
///
/// Note that if the target host does not respond to probes then the the highest `ttl` observed
/// will be one greater than the `ttl` of the last host which did respond.
pub fn is_target(&self, hop: &Hop) -> bool {
self.highest_ttl == hop.ttl
pub fn is_target(&self, hop: &Hop, flow_id: FlowId) -> bool {
self.flows[&flow_id].is_target(hop)
}

/// Is a given `Hop` in the current round?
pub fn is_in_round(&self, hop: &Hop) -> bool {
hop.ttl <= self.highest_ttl_for_round
pub fn is_in_round(&self, hop: &Hop, flow_id: FlowId) -> bool {
self.flows[&flow_id].is_in_round(hop)
}

/// Return the target `Hop`.
///
/// TODO Do we guarantee there is always a target hop?
pub fn target_hop(&self) -> &Hop {
if self.highest_ttl > 0 {
&self.hops[usize::from(self.highest_ttl) - 1]
} else {
&self.hops[0]
}
pub fn target_hop(&self, flow_id: FlowId) -> &Hop {
self.flows[&flow_id].target_hop()
}

/// The current round of tracing.
pub fn round(&self, flow_id: FlowId) -> Option<usize> {
self.flows[&flow_id].round()
}

pub fn probe_count(&self, flow_id: FlowId) -> usize {
self.flows[&flow_id].round_count()
}

pub fn flow_registry(&self) -> &FlowRegistry {
Expand All @@ -88,75 +81,20 @@ impl Trace {

/// Update the tracing state from a `TracerRound`.
pub fn update_from_round(&mut self, round: &TracerRound<'_>) {
self.update_flows(round.probes);
self.highest_ttl = std::cmp::max(self.highest_ttl, round.largest_ttl.0);
self.highest_ttl_for_round = round.largest_ttl.0;
for probe in round.probes {
self.update_from_probe(probe);
}
}

fn update_from_probe(&mut self, probe: &Probe) {
self.update_lowest_ttl(probe);
self.update_round(probe);
match probe.status {
ProbeStatus::Complete => {
let index = usize::from(probe.ttl.0) - 1;
let hop = &mut self.hops[index];
hop.ttl = probe.ttl.0;
hop.total_sent += 1;
hop.total_recv += 1;
let dur = probe.duration();
let dur_ms = dur.as_secs_f64() * 1000_f64;
hop.total_time += dur;
hop.last = Some(dur);
hop.samples.insert(0, dur);
hop.best = hop.best.map_or(Some(dur), |d| Some(d.min(dur)));
hop.worst = hop.worst.map_or(Some(dur), |d| Some(d.max(dur)));
hop.mean += (dur_ms - hop.mean) / hop.total_recv as f64;
hop.m2 += (dur_ms - hop.mean) * (dur_ms - hop.mean);
if hop.samples.len() > self.max_samples {
hop.samples.pop();
}
let host = probe.host.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
*hop.addrs.entry(host).or_default() += 1;
}
ProbeStatus::Awaited => {
let index = usize::from(probe.ttl.0) - 1;
self.hops[index].total_sent += 1;
self.hops[index].ttl = probe.ttl.0;
self.hops[index].samples.insert(0, Duration::default());
if self.hops[index].samples.len() > self.max_samples {
self.hops[index].samples.pop();
}
}
ProbeStatus::NotSent => {}
}
let flow = Flow::from_hops(round.probes.iter().map(|p| p.host));
let flow_id = self.flow_registry.register(flow);
self.update_trace_flow(FlowId(0), round);
self.update_trace_flow(flow_id, round);
}

/// Update `lowest_ttl` for valid probes.
fn update_lowest_ttl(&mut self, probe: &Probe) {
if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) {
if self.lowest_ttl == 0 {
self.lowest_ttl = probe.ttl.0;
} else {
self.lowest_ttl = self.lowest_ttl.min(probe.ttl.0);
}
}
}

/// Update `round` for valid probes.
fn update_round(&mut self, probe: &Probe) {
if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) {
self.round = match self.round {
None => Some(probe.round.0),
Some(r) => Some(r.max(probe.round.0)),
}
}
}
fn update_flows(&mut self, probes: &[Probe]) {
let flow = Flow::from_hops(probes.iter().map(|p| p.host));
self.flow_registry.register(flow);
fn update_trace_flow(&mut self, flow_id: FlowId, round: &TracerRound<'_>) {
let flow_trace = self
.flows
.entry(flow_id)
.or_insert_with(|| TraceFlow::new(self.max_samples));
flow_trace.highest_ttl = std::cmp::max(flow_trace.highest_ttl, round.largest_ttl.0);
flow_trace.highest_ttl_for_round = round.largest_ttl.0;
flow_trace.update_from_round(round);
}
}

Expand Down Expand Up @@ -272,3 +210,135 @@ impl Default for Hop {
}
}
}

/// Data for a single flow within a trace.
#[derive(Debug, Clone)]
struct TraceFlow {
/// The maximum number of samples to record.
max_samples: usize,
/// The lowest ttl observed across all rounds.
lowest_ttl: u8,
/// The highest ttl observed across all rounds.
highest_ttl: u8,
/// The highest ttl observed for the latest round.
highest_ttl_for_round: u8,
/// The latest round received for this flow.
round: Option<usize>,
/// The total number of rounds received for this flow.
round_count: usize,
hops: Vec<Hop>,
}

impl TraceFlow {
fn new(max_samples: usize) -> Self {
Self {
max_samples,
lowest_ttl: 0,
highest_ttl: 0,
highest_ttl_for_round: 0,
round: None,
round_count: 0,
hops: (0..MAX_HOPS).map(|_| Hop::default()).collect(),
}
}

fn hops(&self) -> &[Hop] {
if self.lowest_ttl == 0 || self.highest_ttl == 0 {
&[]
} else {
let start = (self.lowest_ttl as usize) - 1;
let end = self.highest_ttl as usize;
&self.hops[start..end]
}
}

fn is_target(&self, hop: &Hop) -> bool {
self.highest_ttl == hop.ttl
}

fn is_in_round(&self, hop: &Hop) -> bool {
hop.ttl <= self.highest_ttl_for_round
}

fn target_hop(&self) -> &Hop {
if self.highest_ttl > 0 {
&self.hops[usize::from(self.highest_ttl) - 1]
} else {
&self.hops[0]
}
}

fn round(&self) -> Option<usize> {
self.round
}

fn round_count(&self) -> usize {
self.round_count
}

fn update_from_round(&mut self, round: &TracerRound<'_>) {
self.round_count += 1;
for probe in round.probes {
self.update_from_probe(probe);
}
}

fn update_from_probe(&mut self, probe: &Probe) {
self.update_lowest_ttl(probe);
self.update_round(probe);
match probe.status {
ProbeStatus::Complete => {
let index = usize::from(probe.ttl.0) - 1;
let hop = &mut self.hops[index];
hop.ttl = probe.ttl.0;
hop.total_sent += 1;
hop.total_recv += 1;
let dur = probe.duration();
let dur_ms = dur.as_secs_f64() * 1000_f64;
hop.total_time += dur;
hop.last = Some(dur);
hop.samples.insert(0, dur);
hop.best = hop.best.map_or(Some(dur), |d| Some(d.min(dur)));
hop.worst = hop.worst.map_or(Some(dur), |d| Some(d.max(dur)));
hop.mean += (dur_ms - hop.mean) / hop.total_recv as f64;
hop.m2 += (dur_ms - hop.mean) * (dur_ms - hop.mean);
if hop.samples.len() > self.max_samples {
hop.samples.pop();
}
let host = probe.host.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
*hop.addrs.entry(host).or_default() += 1;
}
ProbeStatus::Awaited => {
let index = usize::from(probe.ttl.0) - 1;
self.hops[index].total_sent += 1;
self.hops[index].ttl = probe.ttl.0;
self.hops[index].samples.insert(0, Duration::default());
if self.hops[index].samples.len() > self.max_samples {
self.hops[index].samples.pop();
}
}
ProbeStatus::NotSent => {}
}
}

/// Update `round` for valid probes.
fn update_round(&mut self, probe: &Probe) {
if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) {
self.round = match self.round {
None => Some(probe.round.0),
Some(r) => Some(r.max(probe.round.0)),
}
}
}

/// Update `lowest_ttl` for valid probes.
fn update_lowest_ttl(&mut self, probe: &Probe) {
if matches!(probe.status, ProbeStatus::Awaited | ProbeStatus::Complete) {
if self.lowest_ttl == 0 {
self.lowest_ttl = probe.ttl.0;
} else {
self.lowest_ttl = self.lowest_ttl.min(probe.ttl.0);
}
}
}
}
3 changes: 2 additions & 1 deletion src/frontend/render/body.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::backend::trace::Trace;
use crate::frontend::render::{bsod, chart, splash, table, world};
use crate::frontend::tui_app::TuiApp;
use ratatui::layout::Rect;
Expand All @@ -10,7 +11,7 @@ use ratatui::Frame;
pub fn render(f: &mut Frame<'_>, rec: Rect, app: &mut TuiApp) {
if let Some(err) = app.selected_tracer_data.error() {
bsod::render(f, rec, err);
} else if app.tracer_data().hops().is_empty() {
} else if app.tracer_data().hops(Trace::default_flow_id()).is_empty() {
splash::render(f, app, rec);
} else if app.show_chart {
chart::render(f, app, rec);
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/render/chart.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::backend::trace::Trace;
use crate::frontend::tui_app::TuiApp;
use ratatui::layout::{Alignment, Constraint, Rect};
use ratatui::style::Style;
Expand All @@ -9,13 +10,13 @@ use ratatui::Frame;
/// Render the ping history for all hops as a chart.
pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) {
let selected_hop = app.table_state.selected().map_or_else(
|| app.tracer_data().target_hop(),
|s| &app.tracer_data().hops()[s],
|| app.tracer_data().target_hop(Trace::default_flow_id()),
|s| &app.tracer_data().hops(Trace::default_flow_id())[s],
);
let samples = app.tui_config.max_samples / app.zoom_factor;
let series_data = app
.selected_tracer_data
.hops()
.hops(Trace::default_flow_id())
.iter()
.map(|hop| {
hop.samples()
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/render/header.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::backend::trace::Trace;
use crate::frontend::tui_app::TuiApp;
use chrono::SecondsFormat;
use humantime::format_duration;
Expand Down Expand Up @@ -94,7 +95,7 @@ pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) {
Span::raw(render_status(app)),
Span::raw(format!(
", discovered {} hops",
app.tracer_data().hops().len()
app.tracer_data().hops(Trace::default_flow_id()).len()
)),
]),
];
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/render/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::backend::trace::Trace;
use crate::frontend::tui_app::TuiApp;
use ratatui::layout::Rect;
use ratatui::style::{Modifier, Style};
Expand All @@ -9,8 +10,8 @@ use std::time::Duration;
/// Render a histogram of ping frequencies.
pub fn render(f: &mut Frame<'_>, app: &TuiApp, rect: Rect) {
let target_hop = app.table_state.selected().map_or_else(
|| app.tracer_data().target_hop(),
|s| &app.tracer_data().hops()[s],
|| app.tracer_data().target_hop(Trace::default_flow_id()),
|s| &app.tracer_data().hops(Trace::default_flow_id())[s],
);
let freq_data = sample_frequency(target_hop.samples());
let freq_data_ref: Vec<_> = freq_data.iter().map(|(b, c)| (b.as_str(), *c)).collect();
Expand Down
Loading

0 comments on commit 050075a

Please sign in to comment.