diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 534e8a0bf76..65f2f751051 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -558,8 +558,10 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { } TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) { + int kNumLevels = 7; Options options = CurrentOptions(); options.disable_auto_compactions = true; + options.num_levels = kNumLevels; Reopen(options); std::map true_data; @@ -567,43 +569,65 @@ TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) { // prevent range deletions from being dropped due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); - // range del [0, 50) in L0 file, [50, 100) in memtable - for (int i = 0; i < 2; i++) { - if (i == 1) { + // range del [0, 50) in L6 file, [50, 100) in L0 file, [100, 150) in memtable + for (int i = 0; i < 3; i++) { + if (i != 0) { db_->Flush(FlushOptions()); + if (i == 1) { + MoveFilesToLevel(kNumLevels - 1); + } } ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(50 * i), Key(50 * (i + 1)))); } ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2)); + ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 1)); - // overlaps with L0 file but not memtable, so flush is skipped + // overlaps with L0 file but not memtable, so flush is skipped and file is + // ingested into L0 SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(GenerateAndAddExternalFile( + options, {60, 90}, {ValueType::kTypeValue, ValueType::kTypeValue}, + file_id++, &true_data)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2)); + ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1)); + + // overlaps with L6 file but not memtable or L0 file, so flush is skipped and + // file is ingested into L5 ASSERT_OK(GenerateAndAddExternalFile( options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue}, file_id++, &true_data)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno); ASSERT_EQ(2, NumTableFilesAtLevel(0)); + ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2)); + ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1)); - // overlaps with memtable, so flush is triggered (thus file count increases by - // two at this step). + // ingested file overlaps with memtable, so flush is triggered before the file + // is ingested such that the ingested data is considered newest. So L0 file + // count increases by two. ASSERT_OK(GenerateAndAddExternalFile( - options, {50, 90}, {ValueType::kTypeValue, ValueType::kTypeValue}, + options, {100, 140}, {ValueType::kTypeValue, ValueType::kTypeValue}, file_id++, &true_data)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno); ASSERT_EQ(4, NumTableFilesAtLevel(0)); + ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2)); + ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1)); - // snapshot unneeded now that both range deletions are persisted + // snapshot unneeded now that all range deletions are persisted db_->ReleaseSnapshot(snapshot); // overlaps with nothing, so places at bottom level and skips incrementing // seqnum. ASSERT_OK(GenerateAndAddExternalFile( - options, {101, 125}, {ValueType::kTypeValue, ValueType::kTypeValue}, + options, {151, 175}, {ValueType::kTypeValue, ValueType::kTypeValue}, file_id++, &true_data)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); ASSERT_EQ(4, NumTableFilesAtLevel(0)); - ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1)); + ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2)); + ASSERT_EQ(2, NumTableFilesAtLevel(options.num_levels - 1)); } #endif // ROCKSDB_LITE diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index d52a496da26..e8648891977 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -376,6 +376,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( SuperVersion* sv, bool* overlap) { + *overlap = false; // Create an InternalIterator over all memtables Arena arena; ReadOptions ro; @@ -391,26 +392,33 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( memtable_range_del_iters.push_back(active_range_del_iter); } sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters); - std::unique_ptr memtable_range_del_iter(NewMergingIterator( - &cfd_->internal_comparator(), - memtable_range_del_iters.empty() ? nullptr : &memtable_range_del_iters[0], - static_cast(memtable_range_del_iters.size()))); - + RangeDelAggregator range_del_agg(cfd_->internal_comparator(), + {} /* snapshots */, + false /* collapse_deletions */); Status status; - *overlap = false; - for (IngestedFileInfo& f : files_to_ingest_) { - status = - IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), overlap); - if (!status.ok() || *overlap == true) { - break; - } - status = IngestedFileOverlapWithRangeDeletions( - &f, memtable_range_del_iter.get(), overlap); - if (!status.ok() || *overlap == true) { - break; + { + std::unique_ptr memtable_range_del_iter( + NewMergingIterator(&cfd_->internal_comparator(), + memtable_range_del_iters.empty() + ? nullptr + : &memtable_range_del_iters[0], + static_cast(memtable_range_del_iters.size()))); + status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter)); + } + if (status.ok()) { + for (IngestedFileInfo& f : files_to_ingest_) { + status = IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), + overlap); + if (!status.ok() || *overlap == true) { + break; + } + if (range_del_agg.IsRangeOverlapped(f.smallest_user_key, + f.largest_user_key)) { + *overlap = true; + break; + } } } - return status; } @@ -575,34 +583,6 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange( return iter->status(); } -Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions( - const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, - bool* overlap) { - auto* vstorage = cfd_->current()->storage_info(); - auto* ucmp = vstorage->InternalComparator()->user_comparator(); - - *overlap = false; - if (range_del_iter != nullptr) { - for (range_del_iter->SeekToFirst(); range_del_iter->Valid(); - range_del_iter->Next()) { - ParsedInternalKey parsed_key; - if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) { - return Status::Corruption("corrupted range deletion key: " + - range_del_iter->key().ToString()); - } - RangeTombstone range_del(parsed_key, range_del_iter->value()); - if (ucmp->Compare(range_del.start_key_, - file_to_ingest->largest_user_key) <= 0 && - ucmp->Compare(file_to_ingest->smallest_user_key, - range_del.end_key_) <= 0) { - *overlap = true; - break; - } - } - } - return Status::OK(); -} - bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( const IngestedFileInfo* file_to_ingest, int level) { if (level == 0) { @@ -639,23 +619,26 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel( ro.total_order_seek = true; MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); + // Files are opened lazily when the iterator needs them, thus range deletions + // are also added lazily to the aggregator. We need to check for range + // deletion overlap only in the case where there's no point-key overlap. Then, + // we've already opened the file with range containing the ingested file's + // begin key, and iterated through all files until the one containing the + // ingested file's end key. So any files maybe containing range deletions + // overlapping the ingested file must have been opened and had their range + // deletions added to the aggregator. + RangeDelAggregator range_del_agg(cfd_->internal_comparator(), + {} /* snapshots */, + false /* collapse_deletions */); sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl, - nullptr /* range_del_agg */); + &range_del_agg); ScopedArenaIterator level_iter(merge_iter_builder.Finish()); - - std::vector level_range_del_iters; - sv->current->AddRangeDelIteratorsForLevel(ro, env_options_, lvl, - &level_range_del_iters); - std::unique_ptr level_range_del_iter(NewMergingIterator( - &cfd_->internal_comparator(), - level_range_del_iters.empty() ? nullptr : &level_range_del_iters[0], - static_cast(level_range_del_iters.size()))); - Status status = IngestedFileOverlapWithIteratorRange( file_to_ingest, level_iter.get(), overlap_with_level); - if (status.ok() && *overlap_with_level == false) { - status = IngestedFileOverlapWithRangeDeletions( - file_to_ingest, level_range_del_iter.get(), overlap_with_level); + if (status.ok() && *overlap_with_level == false && + range_del_agg.IsRangeOverlapped(file_to_ingest->smallest_user_key, + file_to_ingest->largest_user_key)) { + *overlap_with_level = true; } return status; } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 2d0fadeed79..e42c50603e5 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -139,13 +139,6 @@ class ExternalSstFileIngestionJob { const IngestedFileInfo* file_to_ingest, InternalIterator* iter, bool* overlap); - // Check if `file_to_ingest` key range overlaps with any range deletions - // specified by `iter`. - // REQUIRES: Mutex held - Status IngestedFileOverlapWithRangeDeletions( - const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, - bool* overlap); - // Check if `file_to_ingest` key range overlap with level // REQUIRES: Mutex held Status IngestedFileOverlapWithLevel(SuperVersion* sv, diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index e80b5349467..fdd847a7ac3 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -140,6 +140,29 @@ bool RangeDelAggregator::ShouldDeleteImpl( return parsed.sequence < tombstone_map_iter->second.seq_; } +bool RangeDelAggregator::IsRangeOverlapped(const Slice& start, + const Slice& end) { + // so far only implemented for non-collapsed mode since file ingestion (only + // client) doesn't use collapsing + assert(!collapse_deletions_); + if (rep_ == nullptr) { + return false; + } + for (const auto& seqnum_and_tombstone_map : rep_->stripe_map_) { + for (const auto& start_key_and_tombstone : + seqnum_and_tombstone_map.second.raw_map) { + const auto& tombstone = start_key_and_tombstone.second; + if (icmp_.user_comparator()->Compare(start, tombstone.end_key_) < 0 && + icmp_.user_comparator()->Compare(tombstone.start_key_, end) <= 0 && + icmp_.user_comparator()->Compare(tombstone.start_key_, + tombstone.end_key_) < 0) { + return true; + } + } + } + return false; +} + bool RangeDelAggregator::ShouldAddTombstones( bool bottommost_level /* = false */) { // TODO(andrewkr): can we just open a file and throw it away if it ends up diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 76fe1870a07..f050e8917e0 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -92,6 +92,15 @@ class RangeDelAggregator { bool ShouldDeleteImpl(const Slice& internal_key, RangePositioningMode mode = kFullScan); + // Checks whether range deletions cover any keys between `start` and `end`, + // inclusive. + // + // @param start User key representing beginning of range to check for overlap. + // @param end User key representing end of range to check for overlap. This + // argument is inclusive, so the existence of a range deletion covering + // `end` causes this to return true. + bool IsRangeOverlapped(const Slice& start, const Slice& end); + bool ShouldAddTombstones(bool bottommost_level = false); // Adds tombstones to the tombstone aggregation structure maintained by this diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc index 39029bd2a2e..5896a5638ee 100644 --- a/db/range_del_aggregator_test.cc +++ b/db/range_del_aggregator_test.cc @@ -28,9 +28,9 @@ enum Direction { void VerifyRangeDels(const std::vector& range_dels, const std::vector& expected_points) { + auto icmp = InternalKeyComparator(BytewiseComparator()); // Test same result regardless of which order the range deletions are added. for (Direction dir : {kForward, kReverse}) { - auto icmp = InternalKeyComparator(BytewiseComparator()); RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true); std::vector keys, values; for (const auto& range_del : range_dels) { @@ -62,6 +62,27 @@ void VerifyRangeDels(const std::vector& range_dels, } } } + + RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, + false /* collapse_deletions */); + std::vector keys, values; + for (const auto& range_del : range_dels) { + auto key_and_value = range_del.Serialize(); + keys.push_back(key_and_value.first.Encode().ToString()); + values.push_back(key_and_value.second.ToString()); + } + std::unique_ptr range_del_iter( + new test::VectorIterator(keys, values)); + range_del_agg.AddTombstones(std::move(range_del_iter)); + for (size_t i = 1; i < expected_points.size(); ++i) { + bool overlapped = range_del_agg.IsRangeOverlapped( + expected_points[i - 1].begin, expected_points[i].begin); + if (expected_points[i - 1].seq > 0 || expected_points[i].seq > 0) { + ASSERT_TRUE(overlapped); + } else { + ASSERT_FALSE(overlapped); + } + } } } // anonymous namespace @@ -112,9 +133,14 @@ TEST_F(RangeDelAggregatorTest, SameEndKey) { } TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { - VerifyRangeDels( - {{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, - {{" ", 0}, {"a", 5}, {"b", 0}, {"c", 10}, {"d", 0}, {"e", 15}, {"f", 0}}); + VerifyRangeDels({{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, {{" ", 0}, + {"a", 5}, + {"b", 0}, + {"c", 10}, + {"d", 0}, + {"da", 0}, + {"e", 15}, + {"f", 0}}); } // Note the Cover* tests also test cases where tombstones are inserted under a diff --git a/db/version_set.cc b/db/version_set.cc index f432a56c0b8..b2ec82fbaad 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -874,22 +874,6 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, } } -void Version::AddRangeDelIteratorsForLevel( - const ReadOptions& read_options, const EnvOptions& soptions, int level, - std::vector* range_del_iters) { - range_del_iters->clear(); - for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) { - const auto& file = storage_info_.LevelFilesBrief(level).files[i]; - auto* range_del_iter = cfd_->table_cache()->NewRangeTombstoneIterator( - read_options, soptions, cfd_->internal_comparator(), file.fd, - cfd_->internal_stats()->GetFileReadHist(level), - false /* skip_filters */, level); - if (range_del_iter != nullptr) { - range_del_iters->push_back(range_del_iter); - } - } -} - VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, diff --git a/db/version_set.h b/db/version_set.h index 7942ce7f18c..583e2d99530 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -525,10 +525,6 @@ class Version { MergeIteratorBuilder* merger_iter_builder, int level, RangeDelAggregator* range_del_agg); - void AddRangeDelIteratorsForLevel( - const ReadOptions& read_options, const EnvOptions& soptions, int level, - std::vector* range_del_iters); - // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. // Uses *operands to store merge_operator operations to apply later.