Skip to content

Commit

Permalink
Merge branch 'wen-coding-silent_repair_phase'
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-coding committed Aug 15, 2023
2 parents 9fb105c + 962db56 commit 5f9348e
Show file tree
Hide file tree
Showing 36 changed files with 1,745 additions and 96 deletions.
368 changes: 349 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ members = [
"validator",
"version",
"watchtower",
"wen-restart",
"zk-keygen",
"zk-token-sdk",
]
Expand Down Expand Up @@ -361,6 +362,7 @@ solana-turbine = { path = "turbine", version = "=1.17.0" }
solana-udp-client = { path = "udp-client", version = "=1.17.0" }
solana-version = { path = "version", version = "=1.17.0" }
solana-vote-program = { path = "programs/vote", version = "=1.17.0" }
solana-wen-restart = { path = "wen-restart", version = "=1.17.0" }
solana-zk-keygen = { path = "zk-keygen", version = "=1.17.0" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=1.17.0" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=1.17.0" }
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
solana-wen-restart = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
sys-info = { workspace = true }
Expand Down
28 changes: 23 additions & 5 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use {
timing::timestamp,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
solana_wen_restart::wen_restart::RestartSlotsToRepairReceiver,
std::{
collections::{HashMap, HashSet},
iter::Iterator,
Expand Down Expand Up @@ -107,6 +108,7 @@ pub struct RepairStats {
pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup,
pub orphan: RepairStatsGroup,
pub wen_restart: RepairStatsGroup,
pub get_best_orphans_us: u64,
pub get_best_shreds_us: u64,
}
Expand Down Expand Up @@ -209,6 +211,8 @@ pub struct RepairInfo {
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub wen_restart_repair_receiver: RestartSlotsToRepairReceiver,
pub in_wen_restart: Arc<AtomicBool>,
}

pub struct RepairSlotRange {
Expand Down Expand Up @@ -289,7 +293,9 @@ impl RepairService {
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let mut repair_weight = RepairWeight::new(
repair_info.bank_forks.read().unwrap().root(),
);
let serve_repair = ServeRepair::new(
repair_info.cluster_info.clone(),
repair_info.bank_forks.clone(),
Expand All @@ -302,6 +308,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
let mut wen_restart_slots_to_repair: Option<Vec<Slot>> = Some(Vec::new());

loop {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -376,6 +383,16 @@ impl RepairService {
);
add_votes_elapsed.stop();

let in_wen_restart = repair_info.in_wen_restart.load(Ordering::Relaxed);
if in_wen_restart {
repair_info.wen_restart_repair_receiver.try_iter().for_each(
|new_slots| {
wen_restart_slots_to_repair = Some(new_slots);
},
)
} else if wen_restart_slots_to_repair.is_some() {
wen_restart_slots_to_repair = None;
}
let repairs = repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
Expand All @@ -386,6 +403,7 @@ impl RepairService {
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
&wen_restart_slots_to_repair,
);

let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
Expand Down Expand Up @@ -880,7 +898,7 @@ mod test {
let (shreds2, _) = make_slot_entries(5, 2, 1, /*merkle_variant:*/ true);
shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
let mut repair_weight = RepairWeight::new(0, false);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
Expand Down Expand Up @@ -914,7 +932,7 @@ mod test {
// Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
let mut repair_weight = RepairWeight::new(0, false);

// Check that repair tries to patch the empty slot
assert_eq!(
Expand Down Expand Up @@ -975,7 +993,7 @@ mod test {
})
.collect();

let mut repair_weight = RepairWeight::new(0);
let mut repair_weight = RepairWeight::new(0, false);
sleep_shred_deferment_period();
assert_eq!(
repair_weight.get_best_weighted_repairs(
Expand Down Expand Up @@ -1037,7 +1055,7 @@ mod test {
vec![ShredRepairType::HighestShred(0, num_shreds_per_slot - 1)];

sleep_shred_deferment_period();
let mut repair_weight = RepairWeight::new(0);
let mut repair_weight = RepairWeight::new(0, false);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
Expand Down
11 changes: 11 additions & 0 deletions core/src/repair_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,19 @@ impl RepairWeight {
max_closest_completion_repairs: usize,
repair_timing: &mut RepairTiming,
stats: &mut BestRepairsStats,
slots_to_repair_for_wen_restart: &Option<Vec<Slot>>,
) -> Vec<ShredRepairType> {
let mut repairs = vec![];

if let Some(my_slots_to_repair) = slots_to_repair_for_wen_restart {
info!("wen_restart repairing {:?}", my_slots_to_repair);
repairs = my_slots_to_repair
.iter()
.map(|slot| ShredRepairType::WenRestart(slot.clone()))
.collect();
return repairs;
}

let mut processed_slots = HashSet::from([self.root]);
let mut slot_meta_cache = HashMap::default();

Expand Down
21 changes: 11 additions & 10 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
rewards_recorder_service::{RewardsMessage, RewardsRecorderSender},
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
validator::ProcessBlockStore,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
},
Expand Down Expand Up @@ -482,7 +481,7 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
maybe_process_blockstore: Option<ProcessBlockStore>,
prev_tower: Tower,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: Sender<Slot>,
Expand All @@ -500,15 +499,9 @@ impl ReplayStage {
dumped_slots_sender: DumpedSlotsSender,
banking_tracer: Arc<BankingTracer>,
popular_pruned_forks_receiver: PopularPrunedForksReceiver,
in_wen_restart: Arc<AtomicBool>,
) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower()?;
info!("Tower state: {:?}", tower);
tower
} else {
warn!("creating default tower....");
Tower::default()
};
let mut tower = prev_tower;

let ReplayStageConfig {
vote_account,
Expand Down Expand Up @@ -1052,6 +1045,7 @@ impl ReplayStage {
&banking_tracer,
has_new_vote_been_rooted,
transaction_status_sender.is_some(),
in_wen_restart.load(Ordering::Relaxed),
);

let poh_bank = poh_recorder.read().unwrap().bank();
Expand Down Expand Up @@ -1840,6 +1834,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
in_wen_restart: bool,
) {
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention
Expand Down Expand Up @@ -1884,6 +1879,11 @@ impl ReplayStage {
);

if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
if in_wen_restart {
info!("In wen_restart, skipping my leader slot");
return;
}

if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
Expand Down Expand Up @@ -3715,6 +3715,7 @@ impl ReplayStage {
new_root,
accounts_background_request_sender,
highest_super_majority_root,
false,
);

drop_bank_sender
Expand Down
39 changes: 35 additions & 4 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub enum ShredRepairType {
HighestShred(Slot, u64),
/// Requesting the missing shred at a particular index
Shred(Slot, u64),
/// Reuqest from wen restart
WenRestart(Slot),
}

impl ShredRepairType {
Expand All @@ -119,6 +121,7 @@ impl ShredRepairType {
ShredRepairType::Orphan(slot) => *slot,
ShredRepairType::HighestShred(slot, _) => *slot,
ShredRepairType::Shred(slot, _) => *slot,
ShredRepairType::WenRestart(slot) => *slot,
}
}
}
Expand All @@ -130,6 +133,7 @@ impl RequestResponse for ShredRepairType {
ShredRepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES) as u32,
ShredRepairType::HighestShred(_, _) => 1,
ShredRepairType::Shred(_, _) => 1,
ShredRepairType::WenRestart(_) => (MAX_ORPHAN_REPAIR_RESPONSES) as u32,
}
}
fn verify_response(&self, response_shred: &Shred) -> bool {
Expand All @@ -141,6 +145,7 @@ impl RequestResponse for ShredRepairType {
ShredRepairType::Shred(slot, index) => {
response_shred.slot() == *slot && response_shred.index() as u64 == *index
}
ShredRepairType::WenRestart(slot) => response_shred.slot() == *slot,
}
}
}
Expand Down Expand Up @@ -257,6 +262,10 @@ pub enum RepairProtocol {
header: RepairRequestHeader,
slot: Slot,
},
WenRestart {
header: RepairRequestHeader,
slot: Slot,
},
}

const REPAIR_REQUEST_PONG_SERIALIZED_BYTES: usize = PUBKEY_BYTES + HASH_BYTES + SIGNATURE_BYTES;
Expand Down Expand Up @@ -299,6 +308,7 @@ impl RepairProtocol {
Self::HighestWindowIndex { header, .. } => &header.sender,
Self::Orphan { header, .. } => &header.sender,
Self::AncestorHashes { header, .. } => &header.sender,
Self::WenRestart { header, .. } => &header.sender,
}
}

Expand All @@ -315,6 +325,7 @@ impl RepairProtocol {
| Self::WindowIndex { .. }
| Self::HighestWindowIndex { .. }
| Self::Orphan { .. }
| Self::WenRestart { .. }
| Self::AncestorHashes { .. } => true,
}
}
Expand All @@ -327,9 +338,9 @@ impl RepairProtocol {
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::AncestorHashes { .. }
| RepairProtocol::LegacyAncestorHashes(_, _, _) => 1,
RepairProtocol::Orphan { .. } | RepairProtocol::LegacyOrphanWithNonce(_, _, _) => {
MAX_ORPHAN_REPAIR_RESPONSES
}
RepairProtocol::Orphan { .. }
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::WenRestart { .. } => MAX_ORPHAN_REPAIR_RESPONSES,
RepairProtocol::Pong(_) => 0, // no response
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
Expand Down Expand Up @@ -461,6 +472,13 @@ impl ServeRepair {
RepairProtocol::Orphan {
header: RepairRequestHeader { nonce, .. },
slot,
<<<<<<< HEAD
=======
}
| RepairProtocol::WenRestart {
header: RepairRequestHeader { nonce, .. },
slot,
>>>>>>> wen-coding-silent_repair_phase
} => {
stats.orphan += 1;
(
Expand Down Expand Up @@ -874,7 +892,8 @@ impl ServeRepair {
RepairProtocol::WindowIndex { header, .. }
| RepairProtocol::HighestWindowIndex { header, .. }
| RepairProtocol::Orphan { header, .. }
| RepairProtocol::AncestorHashes { header, .. } => {
| RepairProtocol::AncestorHashes { header, .. }
| RepairProtocol::WenRestart { header, .. } => {
if &header.recipient != my_id {
return Err(Error::from(RepairVerifyError::IdMismatch));
}
Expand Down Expand Up @@ -926,7 +945,12 @@ impl ServeRepair {
match request {
RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
<<<<<<< HEAD
| RepairProtocol::Orphan { .. } => {
=======
| RepairProtocol::Orphan { .. }
| RepairProtocol::WenRestart { .. } => {
>>>>>>> wen-coding-silent_repair_phase
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
Expand Down Expand Up @@ -1170,6 +1194,13 @@ impl ServeRepair {
slot: *slot,
}
}
ShredRepairType::WenRestart(slot) => {
repair_stats.wen_restart.update(repair_peer_id, *slot, 0);
RepairProtocol::WenRestart {
header,
slot: *slot,
}
}
};
Self::repair_proto_to_bytes(&request_proto, identity_keypair)
}
Expand Down
Loading

0 comments on commit 5f9348e

Please sign in to comment.