Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Review: QUIC send back errors backport 1.14.12 #20

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
d7f6ef8
if client opens a bidirectional quic channel send back the errors
godmodegalactus Dec 18, 2022
d627dd4
Adapting quic client to recieve errors from bi channel and storing it…
godmodegalactus Jan 12, 2023
40cb85c
making a bidirectional messaging service
godmodegalactus Jan 14, 2023
7569a69
Completing the bidirectional reply service
godmodegalactus Jan 15, 2023
c102f73
final touches to quic bidirectional reply service and adding first tests
godmodegalactus Jan 15, 2023
0e5b76e
adding some unittests for quic bidirectional service
godmodegalactus Jan 16, 2023
db06a42
Adding bidirectional message handler on client side and sending messa…
godmodegalactus Jan 17, 2023
f85b33a
quic bidirectional messaging fmt and forgotten files
godmodegalactus Jan 17, 2023
1673ef0
quic bidirectional channel updating unittests
godmodegalactus Jan 17, 2023
3aa9c38
quic bidirectional reply, update after Maxs comments
godmodegalactus Jan 18, 2023
6657349
bidirectional quic channel fixing unit tests
godmodegalactus Jan 19, 2023
21dc8c7
Quic bidirectional reply fixing the rest issues, removing print state…
godmodegalactus Jan 19, 2023
2c9bf55
bidirectional quic replies, solving requirement of tokio multithread …
godmodegalactus Jan 19, 2023
eee9039
bidirectional reply channel, adding metrics, support for multiple sen…
godmodegalactus Jan 20, 2023
f801093
bidirectional reply channel adding more unit tests
godmodegalactus Jan 21, 2023
e606120
bidirectional quic replies, handling currently bidirectioanl message …
godmodegalactus Jan 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)]

use {
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
Expand Down Expand Up @@ -28,7 +29,9 @@ use {
timing::{duration_as_us, timestamp},
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{
bidirectional_channel::QuicBidirectionalReplyService, socket::SocketAddrSpace,
},
std::{
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,
Expand Down Expand Up @@ -359,6 +362,7 @@ fn main() {
None,
Arc::new(connection_cache),
bank_forks.clone(),
QuicBidirectionalReplyService::new(),
);
poh_recorder.write().unwrap().set_bank(&bank, false);

Expand Down
139 changes: 139 additions & 0 deletions client/src/bidirectional_channel_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use {
crossbeam_channel::{Receiver, Sender},
log::debug,
quinn::RecvStream,
solana_sdk::signature::Signature,
solana_streamer::bidirectional_channel::{
QuicReplyMessage, QUIC_REPLY_MESSAGE_OFFSET, QUIC_REPLY_MESSAGE_SIGNATURE_OFFSET,
QUIC_REPLY_MESSAGE_SIZE,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
},
};

pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8;

// This structure will handle the bidirectional messages that we get from the quic server
// It will save 1024 QuicReplyMessages sent by the server in the crossbeam receiver
// This class will also handle recv channel created by the QuicClient when connecting to the server in bidirectional mode
#[derive(Clone)]
pub struct BidirectionalChannelHandler {
sender: Arc<Sender<QuicReplyMessage>>,
pub reciever: Receiver<QuicReplyMessage>,
recv_channel_is_set: Arc<AtomicBool>,
}

impl BidirectionalChannelHandler {
pub fn new() -> Self {
let (sender, reciever) = crossbeam_channel::unbounded();
Self {
sender: Arc::new(sender),
reciever,
recv_channel_is_set: Arc::new(AtomicBool::new(false)),
}
}

pub fn is_serving(&self) -> bool {
self.recv_channel_is_set.load(Ordering::Relaxed)
}

pub fn start_serving(&self, recv_stream: RecvStream) {
if self.is_serving() {
return;
}

let recv_channel_is_set = self.recv_channel_is_set.clone();
let sender = self.sender.clone();

recv_channel_is_set.store(true, Ordering::Relaxed);
// create task to fetch errors from the leader
tokio::spawn(async move {
// wait for 10 s max
let mut timeout: u64 = 10_000;
let mut start = Instant::now();

const LAST_BUFFER_SIZE: usize = QUIC_REPLY_MESSAGE_SIZE + 1;
let mut last_buffer: [u8; LAST_BUFFER_SIZE] = [0; LAST_BUFFER_SIZE];
let mut buffer_written = 0;
let mut recv_stream = recv_stream;
loop {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(timeout),
recv_stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
match chunk {
Ok(maybe_chunk) => {
match maybe_chunk {
Some(chunk) => {
// move data into current buffer
let mut buffer = vec![0; buffer_written + chunk.bytes.len()];
if buffer_written > 0 {
// copy remaining data from previous buffer
buffer[0..buffer_written]
.copy_from_slice(&last_buffer[0..buffer_written]);
}
buffer[buffer_written..buffer_written + chunk.bytes.len()]
.copy_from_slice(&chunk.bytes);
buffer_written = buffer_written + chunk.bytes.len();

while buffer_written >= QUIC_REPLY_MESSAGE_SIZE {
let signature = bincode::deserialize::<Signature>(
&buffer[QUIC_REPLY_MESSAGE_SIGNATURE_OFFSET
..QUIC_REPLY_MESSAGE_OFFSET],
);
let message: [u8; 128] = buffer
[QUIC_REPLY_MESSAGE_OFFSET..QUIC_REPLY_MESSAGE_SIZE]
.try_into()
.unwrap();
if let Ok(signature) = signature {
if let Err(_) =
sender.send(QuicReplyMessage::new_with_bytes(
signature, message,
))
{
// crossbeam channel closed
break;
}
} else {
// deserializing error
debug!("deserializing error on BidirectionalChannelHandler");
}
buffer.copy_within(QUIC_REPLY_MESSAGE_SIZE.., 0);
buffer_written -= QUIC_REPLY_MESSAGE_SIZE;
}
if buffer_written > 0 {
// move remianing data into last buffer
last_buffer[0..buffer_written]
.copy_from_slice(&buffer[0..buffer_written]);
}
}
None => {
// done receiving chunks
break;
}
}
}
Err(e) => {
debug!("BidirectionalChannelHandler recieved error {}", e);
break;
}
}
} else {
break;
}

timeout = timeout.saturating_sub((Instant::now() - start).as_millis() as u64);
start = Instant::now();
}
recv_channel_is_set.store(false, Ordering::Relaxed);
println!("stopping recv stream");
});
}
}
20 changes: 20 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
bidirectional_channel_handler::BidirectionalChannelHandler,
nonblocking::{
quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint},
tpu_connection::NonblockingConnection,
Expand Down Expand Up @@ -57,6 +58,9 @@ pub struct ConnectionCacheStats {
// Need to track these separately per-connection
// because we need to track the base stat value from quinn
pub total_client_stats: ClientStats,

// getting quic errors from leader
pub server_reply_channel: Option<BidirectionalChannelHandler>,
}

const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
Expand Down Expand Up @@ -287,6 +291,22 @@ impl ConnectionCache {
}
}

pub fn new_with_replies_from_tpu(
connection_pool_size: usize,
reply_channel: BidirectionalChannelHandler,
) -> Self {
let connection_pool_size = 1.max(connection_pool_size);
Self {
use_quic: true,
connection_pool_size,
stats: Arc::new(ConnectionCacheStats {
server_reply_channel: Some(reply_channel.clone()),
..Default::default()
}),
..Self::default()
}
}

pub fn update_client_certificate(
&mut self,
keypair: &Keypair,
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern crate serde_derive;
#[macro_use]
extern crate solana_metrics;

pub mod bidirectional_channel_handler;
pub mod blockhash_query;
pub mod client_error;
pub mod connection_cache;
Expand Down
69 changes: 49 additions & 20 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
//! server's flow control.
use {
crate::{
client_error::ClientErrorKind, connection_cache::ConnectionCacheStats,
nonblocking::tpu_connection::TpuConnection, tpu_connection::ClientStats,
bidirectional_channel_handler::BidirectionalChannelHandler, client_error::ClientErrorKind,
connection_cache::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
tpu_connection::ClientStats,
},
async_mutex::Mutex,
async_trait::async_trait,
Expand All @@ -19,8 +20,8 @@ use {
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_CHUNK_SIZE, QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS,
QUIC_MAX_TIMEOUT_MS,
},
signature::Keypair,
transport::Result as TransportResult,
Expand Down Expand Up @@ -187,6 +188,10 @@ impl QuicNewConnection {
.await
{
if connecting_result.is_err() {
println!(
"error while connecting is {} ",
connecting_result.as_ref().err().unwrap()
);
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
}
make_connection_measure.stop();
Expand Down Expand Up @@ -284,11 +289,25 @@ impl QuicClient {
async fn _send_buffer_using_conn(
data: &[u8],
connection: &NewConnection,
server_reply_channel: Option<BidirectionalChannelHandler>,
) -> Result<(), QuicError> {
let mut send_stream = connection.connection.open_uni().await?;
let create_bichannel = match &server_reply_channel {
Some(x) => !x.is_serving(),
None => false,
};
if create_bichannel {
let (mut send_stream, recv_stream) = connection.connection.open_bi().await?;

send_stream.write_all(data).await?;
send_stream.finish().await?;
let server_reply_channel = server_reply_channel.unwrap();
server_reply_channel.start_serving(recv_stream);
} else {
let mut send_stream = connection.connection.open_uni().await?;

send_stream.write_all(data).await?;
send_stream.finish().await?;
send_stream.write_all(data).await?;
send_stream.finish().await?;
godmodegalactus marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(())
}

Expand Down Expand Up @@ -397,7 +416,13 @@ impl QuicClient {
.update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks);

last_connection_id = connection.connection.stable_id();
match Self::_send_buffer_using_conn(data, &connection).await {
match Self::_send_buffer_using_conn(
data,
&connection,
stats.server_reply_channel.clone(),
)
.await
{
Ok(()) => {
return Ok(connection);
}
Expand Down Expand Up @@ -479,11 +504,13 @@ impl QuicClient {
let futures: Vec<_> = chunks
.into_iter()
.map(|buffs| {
join_all(
buffs
.into_iter()
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
)
join_all(buffs.into_iter().map(|buf| {
Self::_send_buffer_using_conn(
buf.as_ref(),
connection_ref,
stats.server_reply_channel.clone(),
)
}))
})
.collect();

Expand Down Expand Up @@ -521,11 +548,7 @@ impl QuicTpuConnection {
addr: SocketAddr,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
let client = Arc::new(QuicClient::new(
endpoint,
addr,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
));
let client = Arc::new(QuicClient::new(endpoint, addr, QUIC_CHUNK_SIZE));
Self::new_with_client(client, connection_stats)
}

Expand All @@ -550,7 +573,10 @@ impl TpuConnection for QuicTpuConnection {
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = ClientStats::default();
let stats = ClientStats {
server_reply_channel: self.connection_stats.server_reply_channel.clone(),
..Default::default()
};
let len = buffers.len();
let res = self
.client
Expand All @@ -566,7 +592,10 @@ impl TpuConnection for QuicTpuConnection {
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = Arc::new(ClientStats::default());
let stats = ClientStats {
server_reply_channel: self.connection_stats.server_reply_channel.clone(),
..Default::default()
};
let send_buffer =
self.client
.send_buffer(wire_transaction, &stats, self.connection_stats.clone());
Expand Down
7 changes: 6 additions & 1 deletion client/src/tpu_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use {
crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection},
crate::{
bidirectional_channel_handler::BidirectionalChannelHandler, quic_client::QuicTpuConnection,
udp_client::UdpTpuConnection,
},
enum_dispatch::enum_dispatch,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_metrics::MovingStat,
Expand All @@ -22,6 +25,8 @@ pub struct ClientStats {
pub tx_acks: MovingStat,
pub make_connection_ms: AtomicU64,
pub send_timeout: AtomicU64,

pub server_reply_channel: Option<BidirectionalChannelHandler>,
}

#[enum_dispatch]
Expand Down
Loading