Skip to content

Commit

Permalink
add range for compaction filter context (tikv#192)
Browse files Browse the repository at this point in the history
* add range for compaction filter context

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
hicqu authored and tabokie committed May 11, 2022
1 parent d95dc08 commit 21ec181
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 10 deletions.
81 changes: 81 additions & 0 deletions db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
Expand Down Expand Up @@ -61,6 +62,86 @@ class FlushedFileCollector : public EventListener {
std::mutex mutex_;
};

class TestFilterFactory : public CompactionFilterFactory {
public:
std::shared_ptr<CompactionFilter::Context> context_;
std::shared_ptr<int> compaction_count_;

TestFilterFactory(std::shared_ptr<CompactionFilter::Context> context,
std::shared_ptr<int> compaction_count) {
this->context_ = context;
this->compaction_count_ = compaction_count;
}

~TestFilterFactory() {}

const char* Name() const { return "TestFilterFactory"; }

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
context_->start_key = context.start_key;
context_->end_key = context.end_key;
context_->is_end_key_inclusive = context.is_end_key_inclusive;
context_->file_numbers.clear();
context_->table_properties.clear();
for (size_t i = 0; i < context.file_numbers.size(); ++i) {
context_->file_numbers.push_back(context.file_numbers[i]);
context_->table_properties.push_back(context.table_properties[i]);
}
*compaction_count_.get() += 1;
return nullptr;
}
};

TEST_F(CompactFilesTest, FilterContext) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
const int kLevel0Trigger = 10;
options.create_if_missing = true;
options.compaction_style = kCompactionStyleLevel;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.compression = kNoCompression;

std::shared_ptr<CompactionFilter::Context> expected_context(
new CompactionFilter::Context);
std::shared_ptr<int> compaction_count(new int(0));
CompactionFilterFactory* factory =
new TestFilterFactory(expected_context, compaction_count);
options.compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(factory);

DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);

// `Flush` is different from `Compaction`.
db->Put(WriteOptions(), ToString(1), "");
db->Put(WriteOptions(), ToString(51), "");
db->Flush(FlushOptions());
db->Put(WriteOptions(), ToString(50), "");
db->Put(WriteOptions(), ToString(99), "");
db->Flush(FlushOptions());
ASSERT_EQ(*compaction_count.get(), 0);

db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
usleep(10000); // Wait for compaction start.
ASSERT_EQ(expected_context->start_key, Slice("1"));
ASSERT_EQ(expected_context->end_key, Slice("99"));
ASSERT_EQ(expected_context->is_end_key_inclusive, 1);
ASSERT_EQ(expected_context->file_numbers[0], 11);
ASSERT_EQ(*compaction_count.get(), 1);

delete (db);
}

TEST_F(CompactFilesTest, L0ConflictsFiles) {
Options options;
// to trigger compaction more easily
Expand Down
21 changes: 18 additions & 3 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ bool Compaction::IsTrivialMove() const {
}

if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
input(0, 0)->fd.GetPathId() == output_path_id() &&
InputCompressionMatchesOutput())) {
input(0, 0)->fd.GetPathId() == output_path_id() &&
InputCompressionMatchesOutput())) {
return false;
}

Expand Down Expand Up @@ -529,7 +529,8 @@ uint64_t Compaction::OutputFilePreallocationSize() const {
preallocation_size + (preallocation_size / 10));
}

std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter(
const Slice* start, const Slice* end) const {
if (!cfd_->ioptions()->compaction_filter_factory) {
return nullptr;
}
Expand All @@ -544,6 +545,20 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
context.is_full_compaction = is_full_compaction_;
context.is_manual_compaction = is_manual_compaction_;
context.is_bottommost_level = bottommost_level_;
context.start_key =
(start == nullptr) ? GetSmallestUserKey() : ExtractUserKey(*start);
context.end_key =
(end == nullptr) ? GetLargestUserKey() : ExtractUserKey(*end);
context.is_end_key_inclusive = (end == nullptr);
for (auto l = inputs_.begin(); l != inputs_.end(); ++l) {
for (auto f = l->files.begin(); f != l->files.end(); ++f) {
std::shared_ptr<const TableProperties> tp;
Status s = input_version_->GetTableProperties(&tp, *f);
assert(s.ok());
context.file_numbers.push_back((*f)->fd.GetNumber());
context.table_properties.push_back(tp);
}
}
context.column_family_id = cfd_->GetID();
context.reason = TableFileCreationReason::kCompaction;
return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
Expand Down
14 changes: 8 additions & 6 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ class Compaction {
void ResetNextCompactionIndex();

// Create a CompactionFilter from compaction_filter_factory
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const Slice* start, const Slice* end) const;

// Create a SstPartitioner from sst_partitioner_factory
std::unique_ptr<SstPartitioner> CreateSstPartitioner() const;
Expand Down Expand Up @@ -330,8 +331,9 @@ class Compaction {

// Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb
// down appropriate key boundaries to RangeDelAggregator during compaction.
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and
// plumb down appropriate key boundaries to RangeDelAggregator during
// compaction.
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);

Expand All @@ -346,7 +348,7 @@ class Compaction {

VersionStorageInfo* input_vstorage_;

const int start_level_; // the lowest level to be compacted
const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t max_compaction_bytes_;
Expand All @@ -357,7 +359,7 @@ class Compaction {
VersionEdit edit_;
const int number_levels_;
ColumnFamilyData* cfd_;
Arena arena_; // Arena used to allocate space for file_levels_
Arena arena_; // Arena used to allocate space for file_levels_

const uint32_t output_path_id_;
CompressionType output_compression_;
Expand All @@ -375,7 +377,7 @@ class Compaction {
// State used to check for number of overlapping grandparent files
// (grandparent == "output_level_ + 1")
std::vector<FileMetaData*> grandparents_;
const double score_; // score that was used to pick this compaction.
const double score_; // score that was used to pick this compaction.

// Is this compaction creating a file in the bottom most level?
const bool bottommost_level_;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
sub_compact->compaction->CreateCompactionFilter(sub_compact->start,
sub_compact->end);
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
Expand Down
14 changes: 14 additions & 0 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -58,6 +60,18 @@ class CompactionFilter : public Customizable {
bool is_manual_compaction;
// Whether output files are in bottommost level or not.
bool is_bottommost_level;

// The range of the compaction.
Slice start_key;
Slice end_key;
bool is_end_key_inclusive;

// File numbers of all involved SST files.
std::vector<uint64_t> file_numbers;

// Properties of all involved SST files.
std::vector<std::shared_ptr<const TableProperties>> table_properties;

// The column family that will contain the created table file.
uint32_t column_family_id;
// Reason this table file is being created.
Expand Down

0 comments on commit 21ec181

Please sign in to comment.