Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle unstaked quic streams for a given connection #34562

Merged
merged 10 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bench-tps/src/send_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,13 @@ where
fn send<C: BenchTpsClient + ?Sized>(&self, client: &Arc<C>) {
let mut send_txs = Measure::start("send_and_clone_txs");
let batch: Vec<_> = self.iter().map(|(_keypair, tx)| tx.clone()).collect();
client.send_batch(batch).expect("transfer");
let result = client.send_batch(batch);
send_txs.stop();
debug!("send {} {}", self.len(), send_txs);
if result.is_err() {
debug!("Failed to send batch {result:?}");
} else {
debug!("send {} {}", self.len(), send_txs);
}
}

fn verify<C: 'static + BenchTpsClient + Send + Sync + ?Sized>(
Expand Down
34 changes: 18 additions & 16 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2928,24 +2928,26 @@ fn setup_transfer_scan_threads(
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
for i in 0..starting_keypairs_.len() {
client
.async_transfer(
1,
&starting_keypairs_[i],
&target_keypairs_[i].pubkey(),
blockhash,
)
.unwrap();
let result = client.async_transfer(
1,
&starting_keypairs_[i],
&target_keypairs_[i].pubkey(),
blockhash,
);
if result.is_err() {
debug!("Failed in transfer for starting keypair: {:?}", result);
}
}
for i in 0..starting_keypairs_.len() {
client
.async_transfer(
1,
&target_keypairs_[i],
&starting_keypairs_[i].pubkey(),
blockhash,
)
.unwrap();
let result = client.async_transfer(
1,
&target_keypairs_[i],
&starting_keypairs_[i].pubkey(),
blockhash,
);
if result.is_err() {
debug!("Failed in transfer for starting keypair: {:?}", result);
}
}
}
})
Expand Down
8 changes: 5 additions & 3 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod tests {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
assert!(total_packets > 0);
}

fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair, IpAddr) {
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
assert!(total_packets > 0);
}

#[tokio::test]
Expand Down Expand Up @@ -182,7 +182,9 @@ mod tests {
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(client.send_data_batch(&packets).await.is_ok());
for packet in packets {
let _ = client.send_data(&packet).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the assertion dropped intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the send_data can return error. So we can't assert on is_ok().

}

nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await;
exit.store(true, Ordering::Relaxed);
Expand Down
94 changes: 84 additions & 10 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
quic::{configure_server, QuicServerError, StreamStats},
quic::{configure_server, QuicServerError, StreamStats, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
Expand Down Expand Up @@ -39,6 +39,10 @@ use {
tokio::{task::JoinHandle, time::timeout},
};

/// Limit to 500K PPS
const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100);
const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);

Expand All @@ -55,6 +59,7 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const STREAM_STOP_CODE_THROTTLING: u32 = 15;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
Expand Down Expand Up @@ -264,6 +269,7 @@ enum ConnectionHandlerError {
MaxStreamError,
}

#[derive(Clone)]
struct NewConnectionHandlerParams {
// In principle, the code can be made to work with a crossbeam channel
// as long as we're careful never to use a blocking recv or send call
Expand Down Expand Up @@ -348,13 +354,11 @@ fn handle_and_cache_new_connection(
drop(connection_table_l);
tokio::spawn(handle_connection(
connection,
params.packet_sender.clone(),
remote_addr,
params.remote_pubkey,
last_update,
connection_table,
stream_exit,
params.stats.clone(),
params.clone(),
peer_type,
wait_for_chunk_timeout,
));
Expand Down Expand Up @@ -681,19 +685,42 @@ async fn packet_batch_sender(
}
}

#[allow(clippy::too_many_arguments)]
fn max_streams_for_connection_in_100ms(
connection_type: ConnectionPeerType,
stake: u64,
total_stake: u64,
) -> u64 {
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_100MS)
.saturating_div(MAX_UNSTAKED_CONNECTIONS as u64)
} else {
let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS);
((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64
}
}

fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool {
if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL {
*last_instant = tokio::time::Instant::now();
true
} else {
false
}
}

async fn handle_connection(
connection: Connection,
packet_sender: AsyncSender<PacketAccumulator>,
remote_addr: SocketAddr,
remote_pubkey: Option<Pubkey>,
last_update: Arc<AtomicU64>,
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
params: NewConnectionHandlerParams,
peer_type: ConnectionPeerType,
wait_for_chunk_timeout: Duration,
) {
let stats = params.stats;
debug!(
"quic new connection {} streams: {} connections: {}",
remote_addr,
Expand All @@ -702,17 +729,28 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let max_streams_per_100ms =
max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake);
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
let mut last_throttling_instant = tokio::time::Instant::now();
let mut streams_in_current_interval = 0;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
{
match stream {
Ok(mut stream) => {
if reset_throttling_params_if_needed(&mut last_throttling_instant) {
streams_in_current_interval = 0;
} else if streams_in_current_interval >= max_streams_per_100ms {
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
streams_in_current_interval = streams_in_current_interval.saturating_add(1);
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let packet_sender = params.packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
Expand Down Expand Up @@ -765,7 +803,7 @@ async fn handle_connection(
}

let removed_connection_count = connection_table.lock().unwrap().remove_connection(
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
stable_id,
);
Expand Down Expand Up @@ -1989,4 +2027,40 @@ pub mod test {
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}

#[test]
fn test_max_streams_for_connection_in_100ms() {
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000),
20
);

// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000),
20
);

// If stake is 0, same limits as unstaked connections will apply.
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000),
20
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000),
60
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000),
4000
);
}
}