Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds socket address for repair service over QUIC #32834

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ mod test {
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
contact_info::{ContactInfo, Protocol},
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
Expand Down Expand Up @@ -1400,7 +1400,7 @@ mod test {
nonce,
);
if let Ok(request_bytes) = request_bytes {
let socket = responder_info.serve_repair().unwrap();
let socket = responder_info.serve_repair(Protocol::UDP).unwrap();
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket);
}
}
Expand Down Expand Up @@ -1470,7 +1470,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down Expand Up @@ -1571,7 +1571,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down Expand Up @@ -1947,7 +1947,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down Expand Up @@ -2010,7 +2010,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down
17 changes: 10 additions & 7 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
},
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo},
contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol},
ping_pong::{self, PingCache, Pong},
weighted_shuffle::WeightedShuffle,
},
Expand Down Expand Up @@ -214,7 +214,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;

/// Window protocol messages
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)]
#[frozen_abi(digest = "7vZyACjc13qQYWUsqWbdidLXR3uNXpmqUZaKeV3gKuY2")]
#[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this due to LegacyContactInfo change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern on this "In general, once we add frozen_abi and its change is published in the stable release channel, its digest should never change. If such a change is needed, we should opt for defining a new struct like FooV1. And special release flow like hard forks should be approached." mentioned in https://docs.solana.com/implemented-proposals/abi-management?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no backward compatibility issue here as described in #32834 (comment)

pub enum RepairProtocol {
LegacyWindowIndex(LegacyContactInfo, Slot, u64),
LegacyHighestWindowIndex(LegacyContactInfo, Slot, u64),
Expand Down Expand Up @@ -350,7 +350,7 @@ impl RepairPeers {
.iter()
.zip(weights)
.filter_map(|(peer, &weight)| {
let addr = peer.serve_repair().ok()?;
let addr = peer.serve_repair(Protocol::UDP).ok()?;
Some(((*peer.pubkey(), addr), weight))
})
.unzip();
Expand Down Expand Up @@ -1078,7 +1078,7 @@ impl ServeRepair {
.shuffle(&mut rand::thread_rng())
.map(|i| index[i])
.filter_map(|i| {
let addr = repair_peers[i].serve_repair().ok()?;
let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(get_ancestor_hash_repair_sample_size())
Expand All @@ -1102,7 +1102,10 @@ impl ServeRepair {
.unzip();
let k = WeightedIndex::new(weights)?.sample(&mut rand::thread_rng());
let n = index[k];
Ok((*repair_peers[n].pubkey(), repair_peers[n].serve_repair()?))
Ok((
*repair_peers[n].pubkey(),
repair_peers[n].serve_repair(Protocol::UDP)?,
))
}

pub(crate) fn map_repair_request(
Expand Down Expand Up @@ -1930,8 +1933,8 @@ mod tests {
&identity_keypair,
)
.unwrap();
assert_eq!(nxt.serve_repair().unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair().unwrap());
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap());

let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243);
let mut nxt = ContactInfo::new(
Expand Down
4 changes: 3 additions & 1 deletion dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ fn get_target(
Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap()))
}
Mode::Repair => todo!("repair socket is not gossiped anymore!"),
Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())),
Mode::ServeRepair => {
Some((*node.pubkey(), node.serve_repair(Protocol::UDP).unwrap()))
}
Comment on lines +452 to +454
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume the plan is to next implement repair over quic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is the plan

Mode::Rpc => None,
};
break;
Expand Down
24 changes: 21 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "4jtxvWyeFwfDQTTGh4yJLyukALzRNVJ9WNnCbFeJUmaS")]
#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what cause the ABI change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The digest changes because the struct field name in LegacyContactInfo has changed. But that is totally fine here because this structs are only (de)serialized using bincode which is totally oblivious to struct field names.

#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -847,7 +847,7 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC).ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair(contact_info::Protocol::UDP).ok()),
node.shred_version(),
))
}
Expand Down Expand Up @@ -1345,7 +1345,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& node.shred_version() == self_shred_version
&& self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP))
&& self.check_socket_addr_space(&node.serve_repair())
&& self.check_socket_addr_space(&node.serve_repair(contact_info::Protocol::UDP))
&& match gossip_crds.get::<&LowestSlot>(*node.pubkey()) {
None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot,
Expand Down Expand Up @@ -2799,6 +2799,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket,
pub serve_repair_quic: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
pub tpu_quic: UdpSocket,
pub tpu_forwards_quic: UdpSocket,
Expand Down Expand Up @@ -2839,6 +2840,7 @@ impl Node {
let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()];
let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap();

let mut info = ContactInfo::new(
Expand Down Expand Up @@ -2871,6 +2873,11 @@ impl Node {
serve_repair.local_addr().unwrap(),
"serve-repair"
);
set_socket!(
set_serve_repair_quic,
serve_repair_quic.local_addr().unwrap(),
"serve-repair QUIC"
);
Node {
info,
sockets: Sockets {
Expand All @@ -2885,6 +2892,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
Expand Down Expand Up @@ -2930,6 +2938,7 @@ impl Node {
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);

Expand Down Expand Up @@ -2959,6 +2968,11 @@ impl Node {
set_socket!(set_rpc, rpc_port, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub");
set_socket!(set_serve_repair, serve_repair_port, "serve-repair");
set_socket!(
set_serve_repair_quic,
serve_repair_quic_port,
"serve-repair QUIC"
);
trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -2975,6 +2989,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
Expand Down Expand Up @@ -3023,6 +3038,7 @@ impl Node {

let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);

let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
Expand All @@ -3044,6 +3060,7 @@ impl Node {
);
let _ = info.set_tpu_vote((addr, tpu_vote_port));
let _ = info.set_serve_repair((addr, serve_repair_port));
let _ = info.set_serve_repair((addr, serve_repair_quic_port));
trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -3059,6 +3076,7 @@ impl Node {
repair,
retransmit_sockets,
serve_repair,
serve_repair_quic,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
tpu_quic,
Expand Down
32 changes: 28 additions & 4 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const SOCKET_TAG_GOSSIP: u8 = 0;
const SOCKET_TAG_RPC: u8 = 2;
const SOCKET_TAG_RPC_PUBSUB: u8 = 3;
const SOCKET_TAG_SERVE_REPAIR: u8 = 4;
const SOCKET_TAG_SERVE_REPAIR_QUIC: u8 = 1;
const SOCKET_TAG_TPU: u8 = 5;
const SOCKET_TAG_TPU_FORWARDS: u8 = 6;
const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7;
Comment on lines 29 to 35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a concern this can get bloated and sparse over time as we add an remove hardcoded sockets? may not be a big issue but readability could suffer. just a thought

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it becomes too sparse, we can use a hash-map instead of an array in the future.

Expand Down Expand Up @@ -224,7 +225,11 @@ impl ContactInfo {
get_socket!(gossip, SOCKET_TAG_GOSSIP);
get_socket!(rpc, SOCKET_TAG_RPC);
get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR);
get_socket!(
serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
get_socket!(
tpu_forwards,
Expand All @@ -238,6 +243,7 @@ impl ContactInfo {
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR);
set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC);
set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
set_socket!(
set_tpu_forwards,
Expand All @@ -248,7 +254,11 @@ impl ContactInfo {
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);

remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR);
remove_socket!(
remove_serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
remove_socket!(
remove_tpu_forwards,
Expand Down Expand Up @@ -370,6 +380,8 @@ impl ContactInfo {
node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap();
node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006))
.unwrap();
node
}

Expand All @@ -392,6 +404,7 @@ impl ContactInfo {
node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((addr, port + 8)).unwrap();
node.set_serve_repair_quic((addr, port + 4)).unwrap();
node
}
}
Expand Down Expand Up @@ -733,9 +746,13 @@ mod tests {
sockets.get(&SOCKET_TAG_RPC_PUBSUB)
);
assert_eq!(
node.serve_repair().ok().as_ref(),
node.serve_repair(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR)
);
assert_eq!(
node.serve_repair(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR_QUIC)
);
assert_eq!(
node.tpu(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU)
Expand Down Expand Up @@ -813,7 +830,14 @@ mod tests {
assert_eq!(old.gossip().unwrap(), node.gossip().unwrap());
assert_eq!(old.rpc().unwrap(), node.rpc().unwrap());
assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap());
assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap());
assert_eq!(
old.serve_repair(Protocol::QUIC).unwrap(),
node.serve_repair(Protocol::QUIC).unwrap()
);
assert_eq!(
old.serve_repair(Protocol::UDP).unwrap(),
node.serve_repair(Protocol::UDP).unwrap()
);
assert_eq!(
old.tpu(Protocol::QUIC).unwrap(),
node.tpu(Protocol::QUIC).unwrap()
Expand Down
13 changes: 7 additions & 6 deletions gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub struct LegacyContactInfo {
tvu: SocketAddr,
/// TVU over QUIC protocol.
tvu_quic: SocketAddr,
unused: SocketAddr,
/// repair service over QUIC protocol.
serve_repair_quic: SocketAddr,
/// transactions address
tpu: SocketAddr,
/// address to forward unprocessed transactions to
Expand Down Expand Up @@ -123,7 +124,7 @@ impl Default for LegacyContactInfo {
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tvu_quic: socketaddr_any!(),
unused: socketaddr_any!(),
serve_repair_quic: socketaddr_any!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh i see. ok replacing the old repair socket with a quic repair socket

tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
tpu_vote: socketaddr_any!(),
Expand All @@ -143,7 +144,7 @@ impl LegacyContactInfo {
gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234),
tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235),
tvu_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1236),
unused: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
serve_repair_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238),
tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239),
tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240),
Expand Down Expand Up @@ -210,7 +211,7 @@ impl LegacyContactInfo {
get_socket!(tpu_vote);
get_socket!(rpc);
get_socket!(rpc_pubsub);
get_socket!(serve_repair);
get_socket!(serve_repair, serve_repair_quic);

set_socket!(set_gossip, gossip);
set_socket!(set_rpc, rpc);
Expand Down Expand Up @@ -272,13 +273,13 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
unused: SOCKET_ADDR_UNSPECIFIED,
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
wallclock: node.wallclock(),
shred_version: node.shred_version(),
})
Expand Down
Loading