Skip to content

Commit

Permalink
Cherry-pick 17377 (Naive traffic sim and deadlock fix) (#17397)
Browse files Browse the repository at this point in the history
## Description 

CP #17377

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
williampsmith authored Apr 29, 2024
1 parent b9b2267 commit 550718e
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 29 deletions.
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ impl ValidatorService {
proxy_ip: Option<SocketAddr>,
) -> Result<(), tonic::Status> {
if let Some(traffic_controller) = &self.traffic_controller {
if !traffic_controller.check(connection_ip, proxy_ip).await {
let connection = connection_ip.map(|ip| ip.ip());
let proxy = proxy_ip.map(|ip| ip.ip());
if !traffic_controller.check(connection, proxy).await {
// Entity in blocklist
if traffic_controller.dry_run_mode() {
debug!(
Expand Down
283 changes: 261 additions & 22 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use dashmap::DashMap;
use fs::File;
use prometheus::IntGauge;
use std::fs;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr};
use std::ops::Add;
use std::sync::Arc;

use self::metrics::TrafficControllerMetrics;
Expand All @@ -19,8 +20,9 @@ use crate::traffic_controller::policies::{
Policy, PolicyResponse, TrafficControlPolicy, TrafficTally,
};
use mysten_metrics::spawn_monitored_task;
use rand::Rng;
use std::fmt::Debug;
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig, Weight};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
Expand Down Expand Up @@ -130,11 +132,7 @@ impl TrafficController {
}

/// Returns true if the connection is allowed, false if it is blocked
pub async fn check(
&self,
connection_ip: Option<SocketAddr>,
proxy_ip: Option<SocketAddr>,
) -> bool {
pub async fn check(&self, connection_ip: Option<IpAddr>, proxy_ip: Option<IpAddr>) -> bool {
let connection_check = self.check_and_clear_blocklist(
connection_ip,
self.blocklists.connection_ips.clone(),
Expand All @@ -155,28 +153,29 @@ impl TrafficController {

async fn check_and_clear_blocklist(
&self,
ip: Option<SocketAddr>,
ip: Option<IpAddr>,
blocklist: BlocklistT,
metric_gauge: &IntGauge,
blocklist_len_gauge: &IntGauge,
) -> bool {
let ip = match ip {
Some(ip) => ip,
None => return true,
}
.ip();
};
let now = SystemTime::now();
match blocklist.get(&ip) {
Some(expiration) if now >= *expiration => {
metric_gauge.dec();
blocklist.remove(&ip);
true
}
None => true,
_ => {
self.metrics.requests_blocked_at_protocol.inc();
false
// the below two blocks cannot be nested, otherwise we will deadlock
// due to aquiring the lock on get, then holding across the remove
let (should_block, should_remove) = {
match blocklist.get(&ip) {
Some(expiration) if now >= *expiration => (false, true),
None => (false, false),
_ => (true, false),
}
};
if should_remove {
blocklist_len_gauge.dec();
blocklist.remove(&ip);
}
!should_block
}
}

Expand Down Expand Up @@ -244,7 +243,7 @@ async fn run_tally_loop(
// Dead man's switch - if we suspect something is sinking all traffic to node, disable nodefw
_ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
if let Some(fw_config) = &fw_config {
error!("No traffic tallies received in {} seconds.", fw_config.drain_timeout_secs);
error!("No traffic tallies received in {} seconds.", timeout);
if mem_drainfile_present {
continue;
}
Expand Down Expand Up @@ -419,3 +418,243 @@ async fn delegate_policy_response(
}
}
}

#[derive(Debug, Clone)]
pub struct TrafficSimMetrics {
pub num_requests: u64,
pub num_blocked: u64,
pub time_to_first_block: Option<Duration>,
pub abs_time_to_first_block: Option<Duration>,
pub total_time_blocked: Duration,
pub num_blocklist_adds: u64,
}

impl Default for TrafficSimMetrics {
fn default() -> Self {
Self {
num_requests: 0,
num_blocked: 0,
time_to_first_block: None,
abs_time_to_first_block: None,
total_time_blocked: Duration::from_micros(0),
num_blocklist_adds: 0,
}
}
}

impl Add for TrafficSimMetrics {
type Output = Self;

fn add(self, other: Self) -> Self {
Self {
num_requests: self.num_requests + other.num_requests,
num_blocked: self.num_blocked + other.num_blocked,
time_to_first_block: match (self.time_to_first_block, other.time_to_first_block) {
(Some(a), Some(b)) => Some(a + b),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
},
abs_time_to_first_block: match (
self.abs_time_to_first_block,
other.abs_time_to_first_block,
) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
},
total_time_blocked: self.total_time_blocked + other.total_time_blocked,
num_blocklist_adds: self.num_blocklist_adds + other.num_blocklist_adds,
}
}
}

pub struct TrafficSim {
pub traffic_controller: TrafficController,
}

impl TrafficSim {
pub async fn run(
policy: PolicyConfig,
num_clients: u8,
per_client_tps: usize,
duration: Duration,
report: bool,
) -> TrafficSimMetrics {
assert!(
per_client_tps <= 10_000,
"per_client_tps must be less than 10,000. For higher values, increase num_clients"
);
assert!(num_clients < 20, "num_clients must be greater than 0");
assert!(num_clients > 0);
assert!(per_client_tps > 0);
assert!(duration.as_secs() > 0);

let controller = TrafficController::spawn_for_test(policy.clone(), None);
let tasks = (0..num_clients).map(|task_num| {
tokio::spawn(Self::run_single_client(
controller.clone(),
duration,
task_num,
per_client_tps,
))
});

let status_task = if report {
Some(tokio::spawn(async move {
println!(
"Running naive traffic simulation for {} seconds",
duration.as_secs()
);
println!("Policy: {:#?}", policy);
println!("Num clients: {}", num_clients);
println!("TPS per client: {}", per_client_tps);
println!(
"Target total TPS: {}",
per_client_tps * num_clients as usize
);
println!("\n");
for _ in 0..duration.as_secs() {
print!(".");
tokio::time::sleep(Duration::from_secs(1)).await;
}
println!();
}))
} else {
None
};

let metrics = futures::future::join_all(tasks).await.into_iter().fold(
TrafficSimMetrics::default(),
|acc, run_client_ret| {
if run_client_ret.is_err() {
error!(
"Error running traffic sim client: {:?}",
run_client_ret.err()
);
acc
} else {
let metrics = run_client_ret.unwrap();
acc + metrics
}
},
);

if report {
status_task.unwrap().await.unwrap();
Self::report_metrics(metrics.clone(), duration, per_client_tps, num_clients);
}
metrics
}

async fn run_single_client(
controller: TrafficController,
duration: Duration,
task_num: u8,
per_client_tps: usize,
) -> TrafficSimMetrics {
// Do an initial sleep for a random amount of time to smooth
// out the traffic. This shouldn't be strictly necessary and
// we can remove if we want more determinism
let sleep_time = Duration::from_micros(rand::thread_rng().gen_range(0..100));
tokio::time::sleep(sleep_time).await;

// collectors
let mut num_requests = 0;
let mut num_blocked = 0;
let mut time_to_first_block = None;
let mut total_time_blocked = Duration::from_micros(0);
let mut num_blocklist_adds = 0;
// state variables
let mut currently_blocked = false;
let mut time_blocked_start = Instant::now();
let start = Instant::now();

while start.elapsed() < duration {
let connection_ip = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, task_num)));
let allowed = controller.check(connection_ip, None).await;
if allowed {
if currently_blocked {
total_time_blocked += time_blocked_start.elapsed();
currently_blocked = false;
}
controller.tally(TrafficTally::new(
connection_ip,
// TODO add proxy IP for testing
None,
// TODO add weight adjustment
Weight::one(),
));
} else {
if !currently_blocked {
time_blocked_start = Instant::now();
currently_blocked = true;
num_blocklist_adds += 1;
if time_to_first_block.is_none() {
time_to_first_block = Some(start.elapsed());
}
}
num_blocked += 1;
}
num_requests += 1;
tokio::time::sleep(Duration::from_micros(1_000_000 / per_client_tps as u64)).await;
}
TrafficSimMetrics {
num_requests,
num_blocked,
time_to_first_block,
abs_time_to_first_block: time_to_first_block,
total_time_blocked,
num_blocklist_adds,
}
}

fn report_metrics(
metrics: TrafficSimMetrics,
duration: Duration,
per_client_tps: usize,
num_clients: u8,
) {
println!("TrafficSim metrics:");
println!("-------------------");
// The below two should be near equal
println!(
"Num expected requests: {}",
per_client_tps * (num_clients as usize) * duration.as_secs() as usize
);
println!("Num actual requests: {}", metrics.num_requests);
// This reflects the number of requests that were blocked, but note that once a client
// is added to the blocklist, all subsequent requests from that client are blocked
// until ttl is expired.
println!("Num blocked requests: {}", metrics.num_blocked);
// This metric on the other hand reflects the number of times a client was added to the blocklist
// and thus can be compared an the expectation based on the policy block threshold and ttl
println!(
"Num times added to blocklist: {}",
metrics.num_blocklist_adds
);
// This averages the duration for the first request to be blocked across all clients,
// which is useful for understanding if the policy is rate limiting based on expectation
let avg_first_block_time = metrics
.time_to_first_block
.map(|ttf| ttf / num_clients as u32);
println!("Average time to first block: {:?}", avg_first_block_time);
// This is the time it took for the first request to be blocked across all clients,
// and is instead more useful for understanding false positives in terms of rate and magnitude.
println!(
"Abolute time to first block (across all clients): {:?}",
metrics.abs_time_to_first_block
);
// Useful for ensuring that TTL is respected
let avg_time_blocked = if metrics.num_blocklist_adds > 0 {
metrics.total_time_blocked.as_millis() as u64 / metrics.num_blocklist_adds
} else {
0
};
println!(
"Average time blocked (ttl): {:?}",
Duration::from_millis(avg_time_blocked)
);
}
}
11 changes: 11 additions & 0 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ pub struct TrafficTally {
pub timestamp: SystemTime,
}

impl TrafficTally {
pub fn new(connection_ip: Option<IpAddr>, proxy_ip: Option<IpAddr>, weight: Weight) -> Self {
Self {
connection_ip,
proxy_ip,
weight,
timestamp: SystemTime::now(),
}
}
}

#[derive(Clone, Debug, Default)]
pub struct PolicyResponse {
pub block_connection_ip: Option<IpAddr>,
Expand Down
Loading

0 comments on commit 550718e

Please sign in to comment.