Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make repair metrics less chatty (#9094) #10080

Merged
merged 1 commit into from
May 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions archiver-lib/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use solana_core::{
gossip_service::GossipService,
packet::{limited_deserialize, PACKET_DATA_SIZE},
repair_service,
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
serve_repair::ServeRepair,
shred_fetch_stage::ShredFetchStage,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
Expand Down Expand Up @@ -839,13 +839,14 @@ impl Archiver {
repair_service::MAX_REPAIR_LENGTH,
&repair_slot_range,
);
let mut repair_stats = RepairStats::default();
//iter over the repairs and send them
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.map_repair_request(&repair_request)
.map_repair_request(&repair_request, &mut repair_stats)
.map(|result| ((archiver_info.gossip, result), repair_request))
.ok()
})
Expand Down
46 changes: 44 additions & 2 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,31 @@ use std::{
sync::{Arc, RwLock},
thread::sleep,
thread::{self, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

#[derive(Default)]
pub struct RepairStatsGroup {
pub count: u64,
pub min: u64,
pub max: u64,
}

impl RepairStatsGroup {
pub fn update(&mut self, slot: u64) {
self.count += 1;
self.min = std::cmp::min(self.min, slot);
self.max = std::cmp::max(self.max, slot);
}
}

#[derive(Default)]
pub struct RepairStats {
pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup,
pub orphan: RepairStatsGroup,
}

pub const MAX_REPAIR_LENGTH: usize = 512;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;
Expand Down Expand Up @@ -107,6 +129,8 @@ impl RepairService {
cluster_info,
);
}
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -148,7 +172,7 @@ impl RepairService {
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(&repair_request)
.repair_request(&repair_request, &mut repair_stats)
.map(|result| (result, repair_request))
.ok()
})
Expand All @@ -161,6 +185,24 @@ impl RepairService {
});
}
}
if last_stats.elapsed().as_secs() > 1 {
let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count
+ repair_stats.orphan.count;
if repair_total > 0 {
datapoint_info!(
"serve_repair-repair",
("repair-total", repair_total, i64),
("shred-count", repair_stats.shred.count, i64),
("highest-shred-count", repair_stats.highest_shred.count, i64),
("orphan-count", repair_stats.orphan.count, i64),
("repair-highest-slot", repair_stats.highest_shred.max, i64),
("repair-orphan", repair_stats.orphan.max, i64),
);
}
repair_stats = RepairStats::default();
last_stats = Instant::now();
}
sleep(Duration::from_millis(REPAIR_MS));
}
}
Expand Down
98 changes: 71 additions & 27 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
packet::Packet,
repair_service::RepairStats,
result::{Error, Result},
};
use bincode::serialize;
Expand Down Expand Up @@ -46,6 +47,16 @@ impl RepairType {
}
}

#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
pub highest_window_index: usize,
pub orphan: usize,
}

/// Window protocol messages
#[derive(Serialize, Deserialize, Debug)]
enum RepairProtocol {
Expand Down Expand Up @@ -104,25 +115,22 @@ impl ServeRepair {
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: RepairProtocol,
stats: &mut ServeRepairStats,
) -> Option<Packets> {
let now = Instant::now();

//TODO verify from is signed
let my_id = me.read().unwrap().keypair.pubkey();
let from = Self::get_repair_sender(&request);
if from.id == my_id {
warn!(
"{}: Ignored received repair request from ME {}",
my_id, from.id,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
stats.self_repair += 1;
return None;
}

let (res, label) = {
match &request {
RepairProtocol::WindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("serve_repair-request-window-index", 1);
stats.window_index += 1;
(
Self::run_window_request(
recycler,
Expand All @@ -138,7 +146,7 @@ impl ServeRepair {
}

RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
stats.highest_window_index += 1;
(
Self::run_highest_window_request(
recycler,
Expand All @@ -151,7 +159,7 @@ impl ServeRepair {
)
}
RepairProtocol::Orphan(_, slot) => {
inc_new_counter_debug!("serve_repair-request-orphan", 1);
stats.orphan += 1;
(
Self::run_orphan(
recycler,
Expand Down Expand Up @@ -186,6 +194,7 @@ impl ServeRepair {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
max_packets: &mut usize,
stats: &mut ServeRepairStats,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
Expand All @@ -202,7 +211,7 @@ impl ServeRepair {

let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
Expand All @@ -215,6 +224,31 @@ impl ServeRepair {
Ok(())
}

fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) {
if stats.self_repair > 0 {
let my_id = me.read().unwrap().keypair.pubkey();
warn!(
"{}: Ignored received repair requests from ME: {}",
my_id, stats.self_repair,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

debug!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
);

inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index);
inc_new_counter_debug!(
"serve_repair-request-highest-window-index",
stats.highest_window_index
);
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);

*stats = ServeRepairStats::default();
}

pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
Expand All @@ -228,6 +262,8 @@ impl ServeRepair {
.name("solana-repair-listen".to_string())
.spawn(move || {
let mut max_packets = 1024;
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
loop {
let result = Self::run_listen(
&me,
Expand All @@ -236,6 +272,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut max_packets,
&mut stats,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Expand All @@ -244,6 +281,10 @@ impl ServeRepair {
if exit.load(Ordering::Relaxed) {
return;
}
if last_print.elapsed().as_secs() > 2 {
Self::report_reset_stats(&me, &mut stats);
last_print = Instant::now();
}
thread_mem_usage::datapoint("solana-repair-listen");
}
})
Expand All @@ -256,6 +297,7 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
packets: Packets,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
Expand All @@ -265,7 +307,9 @@ impl ServeRepair {
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
stats.processed += 1;
let rsp =
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
Expand Down Expand Up @@ -295,7 +339,11 @@ impl ServeRepair {
Ok(out)
}

pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
pub fn repair_request(
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let valid: Vec<_> = self
Expand All @@ -308,31 +356,27 @@ impl ServeRepair {
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request)?;
let out = self.map_repair_request(repair_request, repair_stats)?;

Ok((addr, out))
}

pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
pub fn map_repair_request(
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"serve_repair-repair",
("repair-slot", *slot, i64),
("repair-ix", *shred_index, i64)
);
repair_stats.shred.update(*slot);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestShred(slot, shred_index) => {
datapoint_info!(
"serve_repair-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *shred_index, i64)
);
repair_stats.highest_shred.update(*slot);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
repair_stats.orphan.update(*slot);
Ok(self.orphan_bytes(*slot)?)
}
}
Expand Down Expand Up @@ -592,7 +636,7 @@ mod tests {
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default());
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));

let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
Expand All @@ -613,7 +657,7 @@ mod tests {
};
cluster_info.write().unwrap().insert_info(nxt.clone());
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair);
Expand All @@ -640,7 +684,7 @@ mod tests {
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
Expand Down