Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize file ingestion checks for range deletion overlap #3179

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,52 +558,76 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
}

TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
int kNumLevels = 7;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = kNumLevels;
Reopen(options);

std::map<std::string, std::string> true_data;
int file_id = 1;
// prevent range deletions from being dropped due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();

// range del [0, 50) in L0 file, [50, 100) in memtable
for (int i = 0; i < 2; i++) {
if (i == 1) {
// range del [0, 50) in L6 file, [50, 100) in L0 file, [100, 150) in memtable
for (int i = 0; i < 3; i++) {
if (i != 0) {
db_->Flush(FlushOptions());
if (i == 1) {
MoveFilesToLevel(kNumLevels - 1);
}
}
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50 * i), Key(50 * (i + 1))));
}
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2));
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 1));

// overlaps with L0 file but not memtable, so flush is skipped
// overlaps with L0 file but not memtable, so flush is skipped and file is
// ingested into L0
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
ASSERT_OK(GenerateAndAddExternalFile(
options, {60, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));

// overlaps with L6 file but not memtable or L0 file, so flush is skipped and
// file is ingested into L5
ASSERT_OK(GenerateAndAddExternalFile(
options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));

// overlaps with memtable, so flush is triggered (thus file count increases by
// two at this step).
// ingested file overlaps with memtable, so flush is triggered before the file
// is ingested such that the ingested data is considered newest. So L0 file
// count increases by two.
ASSERT_OK(GenerateAndAddExternalFile(
options, {50, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
options, {100, 140}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));

// snapshot unneeded now that both range deletions are persisted
// snapshot unneeded now that all range deletions are persisted
db_->ReleaseSnapshot(snapshot);

// overlaps with nothing, so places at bottom level and skips incrementing
// seqnum.
ASSERT_OK(GenerateAndAddExternalFile(
options, {101, 125}, {ValueType::kTypeValue, ValueType::kTypeValue},
options, {151, 175}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
ASSERT_EQ(2, NumTableFilesAtLevel(options.num_levels - 1));
}

#endif // ROCKSDB_LITE
Expand Down
99 changes: 41 additions & 58 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(

Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
SuperVersion* sv, bool* overlap) {
*overlap = false;
// Create an InternalIterator over all memtables
Arena arena;
ReadOptions ro;
Expand All @@ -391,26 +392,33 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
memtable_range_del_iters.push_back(active_range_del_iter);
}
sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters);
std::unique_ptr<InternalIterator> memtable_range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(),
memtable_range_del_iters.empty() ? nullptr : &memtable_range_del_iters[0],
static_cast<int>(memtable_range_del_iters.size())));

RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */,
false /* collapse_deletions */);
Status status;
*overlap = false;
for (IngestedFileInfo& f : files_to_ingest_) {
status =
IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), overlap);
if (!status.ok() || *overlap == true) {
break;
}
status = IngestedFileOverlapWithRangeDeletions(
&f, memtable_range_del_iter.get(), overlap);
if (!status.ok() || *overlap == true) {
break;
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As no new locks are created here, why do you need a new scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to make memtable_range_del_iter go out of scope asap since it's been std::moved and shouldn't be used. IDK if this is a good practice or not.

std::unique_ptr<InternalIterator> memtable_range_del_iter(
NewMergingIterator(&cfd_->internal_comparator(),
memtable_range_del_iters.empty()
? nullptr
: &memtable_range_del_iters[0],
static_cast<int>(memtable_range_del_iters.size())));
status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter));
}
if (status.ok()) {
for (IngestedFileInfo& f : files_to_ingest_) {
status = IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(),
overlap);
if (!status.ok() || *overlap == true) {
break;
}
if (range_del_agg.IsRangeOverlapped(f.smallest_user_key,
f.largest_user_key)) {
*overlap = true;
break;
}
}
}

return status;
}

Expand Down Expand Up @@ -575,34 +583,6 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange(
return iter->status();
}

Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap) {
auto* vstorage = cfd_->current()->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator();

*overlap = false;
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) {
return Status::Corruption("corrupted range deletion key: " +
range_del_iter->key().ToString());
}
RangeTombstone range_del(parsed_key, range_del_iter->value());
if (ucmp->Compare(range_del.start_key_,
file_to_ingest->largest_user_key) <= 0 &&
ucmp->Compare(file_to_ingest->smallest_user_key,
range_del.end_key_) <= 0) {
*overlap = true;
break;
}
}
}
return Status::OK();
}

bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
const IngestedFileInfo* file_to_ingest, int level) {
if (level == 0) {
Expand Down Expand Up @@ -639,23 +619,26 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel(
ro.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena);
// Files are opened lazily when the iterator needs them, thus range deletions
// are also added lazily to the aggregator. We need to check for range
// deletion overlap only in the case where there's no point-key overlap. Then,
// we've already opened the file with range containing the ingested file's
// begin key, and iterated through all files until the one containing the
// ingested file's end key. So any files maybe containing range deletions
// overlapping the ingested file must have been opened and had their range
// deletions added to the aggregator.
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */,
false /* collapse_deletions */);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl,
nullptr /* range_del_agg */);
&range_del_agg);
ScopedArenaIterator level_iter(merge_iter_builder.Finish());

std::vector<InternalIterator*> level_range_del_iters;
sv->current->AddRangeDelIteratorsForLevel(ro, env_options_, lvl,
&level_range_del_iters);
std::unique_ptr<InternalIterator> level_range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(),
level_range_del_iters.empty() ? nullptr : &level_range_del_iters[0],
static_cast<int>(level_range_del_iters.size())));

Status status = IngestedFileOverlapWithIteratorRange(
file_to_ingest, level_iter.get(), overlap_with_level);
if (status.ok() && *overlap_with_level == false) {
status = IngestedFileOverlapWithRangeDeletions(
file_to_ingest, level_range_del_iter.get(), overlap_with_level);
if (status.ok() && *overlap_with_level == false &&
range_del_agg.IsRangeOverlapped(file_to_ingest->smallest_user_key,
file_to_ingest->largest_user_key)) {
*overlap_with_level = true;
}
return status;
}
Expand Down
7 changes: 0 additions & 7 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,6 @@ class ExternalSstFileIngestionJob {
const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
bool* overlap);

// Check if `file_to_ingest` key range overlaps with any range deletions
// specified by `iter`.
// REQUIRES: Mutex held
Status IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap);

// Check if `file_to_ingest` key range overlap with level
// REQUIRES: Mutex held
Status IngestedFileOverlapWithLevel(SuperVersion* sv,
Expand Down
23 changes: 23 additions & 0 deletions db/range_del_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,29 @@ bool RangeDelAggregator::ShouldDeleteImpl(
return parsed.sequence < tombstone_map_iter->second.seq_;
}

bool RangeDelAggregator::IsRangeOverlapped(const Slice& start,
const Slice& end) {
// so far only implemented for non-collapsed mode since file ingestion (only
// client) doesn't use collapsing
assert(!collapse_deletions_);
if (rep_ == nullptr) {
return false;
}
for (const auto& seqnum_and_tombstone_map : rep_->stripe_map_) {
for (const auto& start_key_and_tombstone :
seqnum_and_tombstone_map.second.raw_map) {
const auto& tombstone = start_key_and_tombstone.second;
if (icmp_.user_comparator()->Compare(start, tombstone.end_key_) < 0 &&
icmp_.user_comparator()->Compare(tombstone.start_key_, end) <= 0 &&
icmp_.user_comparator()->Compare(tombstone.start_key_,
tombstone.end_key_) < 0) {
return true;
}
}
}
return false;
}

bool RangeDelAggregator::ShouldAddTombstones(
bool bottommost_level /* = false */) {
// TODO(andrewkr): can we just open a file and throw it away if it ends up
Expand Down
9 changes: 9 additions & 0 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ class RangeDelAggregator {
bool ShouldDeleteImpl(const Slice& internal_key,
RangePositioningMode mode = kFullScan);

// Checks whether range deletions cover any keys between `start` and `end`,
// inclusive.
//
// @param start User key representing beginning of range to check for overlap.
// @param end User key representing end of range to check for overlap. This
// argument is inclusive, so the existence of a range deletion covering
// `end` causes this to return true.
bool IsRangeOverlapped(const Slice& start, const Slice& end);

bool ShouldAddTombstones(bool bottommost_level = false);

// Adds tombstones to the tombstone aggregation structure maintained by this
Expand Down
34 changes: 30 additions & 4 deletions db/range_del_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ enum Direction {

void VerifyRangeDels(const std::vector<RangeTombstone>& range_dels,
const std::vector<ExpectedPoint>& expected_points) {
auto icmp = InternalKeyComparator(BytewiseComparator());
// Test same result regardless of which order the range deletions are added.
for (Direction dir : {kForward, kReverse}) {
auto icmp = InternalKeyComparator(BytewiseComparator());
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true);
std::vector<std::string> keys, values;
for (const auto& range_del : range_dels) {
Expand Down Expand Up @@ -62,6 +62,27 @@ void VerifyRangeDels(const std::vector<RangeTombstone>& range_dels,
}
}
}

RangeDelAggregator range_del_agg(icmp, {} /* snapshots */,
false /* collapse_deletions */);
std::vector<std::string> keys, values;
for (const auto& range_del : range_dels) {
auto key_and_value = range_del.Serialize();
keys.push_back(key_and_value.first.Encode().ToString());
values.push_back(key_and_value.second.ToString());
}
std::unique_ptr<test::VectorIterator> range_del_iter(
new test::VectorIterator(keys, values));
range_del_agg.AddTombstones(std::move(range_del_iter));
for (size_t i = 1; i < expected_points.size(); ++i) {
bool overlapped = range_del_agg.IsRangeOverlapped(
expected_points[i - 1].begin, expected_points[i].begin);
if (expected_points[i - 1].seq > 0 || expected_points[i].seq > 0) {
ASSERT_TRUE(overlapped);
} else {
ASSERT_FALSE(overlapped);
}
}
}

} // anonymous namespace
Expand Down Expand Up @@ -112,9 +133,14 @@ TEST_F(RangeDelAggregatorTest, SameEndKey) {
}

TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) {
VerifyRangeDels(
{{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}},
{{" ", 0}, {"a", 5}, {"b", 0}, {"c", 10}, {"d", 0}, {"e", 15}, {"f", 0}});
VerifyRangeDels({{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, {{" ", 0},
{"a", 5},
{"b", 0},
{"c", 10},
{"d", 0},
{"da", 0},
{"e", 15},
{"f", 0}});
}

// Note the Cover* tests also test cases where tombstones are inserted under a
Expand Down
16 changes: 0 additions & 16 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,22 +874,6 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
}
}

void Version::AddRangeDelIteratorsForLevel(
const ReadOptions& read_options, const EnvOptions& soptions, int level,
std::vector<InternalIterator*>* range_del_iters) {
range_del_iters->clear();
for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(level).files[i];
auto* range_del_iter = cfd_->table_cache()->NewRangeTombstoneIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd,
cfd_->internal_stats()->GetFileReadHist(level),
false /* skip_filters */, level);
if (range_del_iter != nullptr) {
range_del_iters->push_back(range_del_iter);
}
}
}

VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
Expand Down
4 changes: 0 additions & 4 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,6 @@ class Version {
MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg);

void AddRangeDelIteratorsForLevel(
const ReadOptions& read_options, const EnvOptions& soptions, int level,
std::vector<InternalIterator*>* range_del_iters);

// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later.
Expand Down