Skip to content

Commit

Permalink
Plumb ability to handle duplicate shreds into shred insertion functions
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Jan 14, 2020
1 parent 699ca5f commit 6a71841
Showing 1 changed file with 92 additions and 71 deletions.
163 changes: 92 additions & 71 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,16 @@ impl Blockstore {
recovered_data_shreds
}

pub fn insert_shreds(
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> {
handle_duplicate: &F,
) -> Result<BlockstoreInsertionMetrics>
where
F: Fn(Shred) -> (),
{
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
Expand Down Expand Up @@ -615,6 +619,7 @@ impl Blockstore {
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
) {
num_inserted += 1;
}
Expand Down Expand Up @@ -658,6 +663,7 @@ impl Blockstore {
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
);
}
}
Expand Down Expand Up @@ -733,6 +739,15 @@ impl Blockstore {
})
}

pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> {
self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {})
}

fn check_insert_coding_shred(
&self,
shred: Shred,
Expand Down Expand Up @@ -819,7 +834,8 @@ impl Blockstore {
}
}

fn check_insert_data_shred(
#[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
Expand All @@ -829,7 +845,11 @@ impl Blockstore {
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
) -> bool {
handle_duplicate: &F,
) -> bool
where
F: Fn(Shred) -> (),
{
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -842,34 +862,32 @@ impl Blockstore {

let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();

if is_trusted
|| Blockstore::should_insert_data_shred(
&shred,
slot_meta,
index_meta.data(),
&self.last_root,
)
if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
handle_duplicate(shred);
return false;
} else if !Blockstore::should_insert_data_shred(&shred, slot_meta, &self.last_root) {
return false;
}
}

let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
true
} else {
false
}
true
} else {
false
}
Expand Down Expand Up @@ -916,10 +934,15 @@ impl Blockstore {
Ok(())
}

fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index());
// Check that the shred doesn't already exist in blockstore
shred_index < slot_meta.consumed || data_index.is_present(shred_index)
}

fn should_insert_data_shred(
shred: &Shred,
slot_meta: &SlotMeta,
data_index: &ShredIndex,
last_root: &RwLock<u64>,
) -> bool {
let shred_index = u64::from(shred.index());
Expand All @@ -931,11 +954,6 @@ impl Blockstore {
false
};

// Check that the data shred doesn't already exist in blockstore
if shred_index < slot_meta.consumed || data_index.is_present(shred_index) {
return false;
}

// Check that we do not receive shred_index >= than the last_index
// for the slot
let last_index = slot_meta.last_index;
Expand Down Expand Up @@ -3976,52 +3994,19 @@ pub mod tests {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
let last_root = RwLock::new(0);

// Insert the first 5 shreds, we don't have a "is_last" shred yet
blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();

// Trying to insert a shred less than `slot_meta.consumed` should fail
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[1],
&slot_meta,
index.data(),
&last_root
),
false
);

// Trying to insert the same shred again should fail
// skip over shred 5 so the `slot_meta.consumed` doesn't increment
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[6],
&slot_meta,
index.data(),
&last_root
),
false
);

// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blockstore
.insert_shreds(shreds[8..9].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
let shred7 = {
if shreds[7].is_data() {
Expand All @@ -4032,15 +4017,14 @@ pub mod tests {
}
};
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false
);

// Insert all pending shreds
let mut shred8 = shreds[8].clone();
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();

// Trying to insert a shred with index > the "is_last" shred should fail
if shred8.is_data() {
Expand All @@ -4049,13 +4033,50 @@ pub mod tests {
panic!("Shred in unexpected format")
}
assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
pub fn test_is_data_shred_present() {
let (shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();

blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
// Insert a shred less than `slot_meta.consumed`, check that
// it already exists
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(Blockstore::is_data_shred_present(
&shreds[1],
&slot_meta,
index.data(),
));

// Insert a shred, check that it already exists
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert!(Blockstore::is_data_shred_present(
&shreds[6],
&slot_meta,
index.data()
),);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
pub fn test_should_insert_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
Expand Down

0 comments on commit 6a71841

Please sign in to comment.