Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb ability to handle duplicate shreds into shred insertion functions #7784

Merged
merged 1 commit into from
Jan 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 92 additions & 71 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,16 @@ impl Blockstore {
recovered_data_shreds
}

pub fn insert_shreds(
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> {
handle_duplicate: &F,
) -> Result<BlockstoreInsertionMetrics>
where
F: Fn(Shred) -> (),
{
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
Expand Down Expand Up @@ -615,6 +619,7 @@ impl Blockstore {
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
) {
num_inserted += 1;
}
Expand Down Expand Up @@ -658,6 +663,7 @@ impl Blockstore {
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
);
}
}
Expand Down Expand Up @@ -733,6 +739,15 @@ impl Blockstore {
})
}

pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> {
self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {})
}

fn check_insert_coding_shred(
&self,
shred: Shred,
Expand Down Expand Up @@ -819,7 +834,8 @@ impl Blockstore {
}
}

fn check_insert_data_shred(
#[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
Expand All @@ -829,7 +845,11 @@ impl Blockstore {
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
) -> bool {
handle_duplicate: &F,
) -> bool
where
F: Fn(Shred) -> (),
{
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -842,34 +862,32 @@ impl Blockstore {

let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();

if is_trusted
|| Blockstore::should_insert_data_shred(
&shred,
slot_meta,
index_meta.data(),
&self.last_root,
)
if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
handle_duplicate(shred);
return false;
} else if !Blockstore::should_insert_data_shred(&shred, slot_meta, &self.last_root) {
return false;
}
}

let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
true
} else {
false
}
true
} else {
false
}
Expand Down Expand Up @@ -916,10 +934,15 @@ impl Blockstore {
Ok(())
}

fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index());
// Check that the shred doesn't already exist in blockstore
shred_index < slot_meta.consumed || data_index.is_present(shred_index)
}

fn should_insert_data_shred(
shred: &Shred,
slot_meta: &SlotMeta,
data_index: &ShredIndex,
last_root: &RwLock<u64>,
) -> bool {
let shred_index = u64::from(shred.index());
Expand All @@ -931,11 +954,6 @@ impl Blockstore {
false
};

// Check that the data shred doesn't already exist in blockstore
if shred_index < slot_meta.consumed || data_index.is_present(shred_index) {
return false;
}

// Check that we do not receive shred_index >= than the last_index
// for the slot
let last_index = slot_meta.last_index;
Expand Down Expand Up @@ -3976,52 +3994,19 @@ pub mod tests {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
let last_root = RwLock::new(0);

// Insert the first 5 shreds, we don't have a "is_last" shred yet
blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();

// Trying to insert a shred less than `slot_meta.consumed` should fail
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[1],
&slot_meta,
index.data(),
&last_root
),
false
);

// Trying to insert the same shred again should fail
// skip over shred 5 so the `slot_meta.consumed` doesn't increment
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[6],
&slot_meta,
index.data(),
&last_root
),
false
);

// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blockstore
.insert_shreds(shreds[8..9].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
let shred7 = {
if shreds[7].is_data() {
Expand All @@ -4032,15 +4017,14 @@ pub mod tests {
}
};
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false
);

// Insert all pending shreds
let mut shred8 = shreds[8].clone();
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();

// Trying to insert a shred with index > the "is_last" shred should fail
if shred8.is_data() {
Expand All @@ -4049,13 +4033,50 @@ pub mod tests {
panic!("Shred in unexpected format")
}
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
pub fn test_is_data_shred_present() {
let (shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();

blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
// Insert a shred less than `slot_meta.consumed`, check that
// it already exists
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(Blockstore::is_data_shred_present(
&shreds[1],
&slot_meta,
index.data(),
));

// Insert a shred, check that it already exists
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert!(Blockstore::is_data_shred_present(
&shreds[6],
&slot_meta,
index.data()
),);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
pub fn test_should_insert_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
Expand Down