Skip to content

Commit

Permalink
Compaction filter optimization
Browse files Browse the repository at this point in the history
compaction_filter: add bottommost_level into context (tikv#160)

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>

add range for compaction filter context (tikv#192)

* add range for compaction filter context

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>

allow no_io for VersionSet::GetTableProperties (tikv#211)

* allow no_io for VersionSet::GetTableProperties

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>

expose seqno from compaction filter and iterator (tikv#215)

This PR supports to access `seqno` for every key/value pairs in compaction filter or iterator.
It's helpful to enhance GC in compaction filter in TiKV.

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>

allow to query DB stall status (tikv#226)

This PR adds a new property is-write-stalled to query whether the column family is in write stall or not.

In TiKV there is a compaction filter used for GC, in which DB::write is called. So if we can query whether the DB instance is stalled or not, we can skip to create more compaction filter instances to save some resources.

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>

Fix compatibilty issue with Titan

Signed-off-by: v01dstar <[email protected]>

filter deletion in compaction filter (tikv#344)

And delay the buffer initialization of writable file to first actual write.

---------

Signed-off-by: tabokie <[email protected]>

Adjustments for compaptibilty with 8.10.facebook

Signed-off-by: v01dstar <[email protected]>

Adjust tikv related changes with upstream

Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
hicqu authored and v01dstar committed Feb 22, 2024
1 parent b27b564 commit 2a93687
Show file tree
Hide file tree
Showing 18 changed files with 1,039 additions and 21 deletions.
3 changes: 3 additions & 0 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class ArenaWrappedDBIter : public Iterator {
Slice key() const override { return db_iter_->key(); }
Slice value() const override { return db_iter_->value(); }
const WideColumns& columns() const override { return db_iter_->columns(); }
inline bool seqno(SequenceNumber* no) const override {
return db_iter_->seqno(no);
}
Status status() const override { return db_iter_->status(); }
Slice timestamp() const override { return db_iter_->timestamp(); }
bool IsBlob() const { return db_iter_->IsBlob(); }
Expand Down
157 changes: 155 additions & 2 deletions db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
Expand Down Expand Up @@ -60,6 +60,79 @@ class FlushedFileCollector : public EventListener {
std::mutex mutex_;
};

class TestFilterFactory : public CompactionFilterFactory {
public:
std::shared_ptr<CompactionFilter::Context> context_;
std::shared_ptr<int> compaction_count_;

TestFilterFactory(std::shared_ptr<CompactionFilter::Context> context,
std::shared_ptr<int> compaction_count) {
this->context_ = context;
this->compaction_count_ = compaction_count;
}

~TestFilterFactory() {}

const char* Name() const { return "TestFilterFactory"; }

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
context_->start_key = context.start_key;
context_->end_key = context.end_key;
context_->is_end_key_inclusive = context.is_end_key_inclusive;
context_->input_table_properties = context.input_table_properties;
*compaction_count_.get() += 1;
return nullptr;
}
};

TEST_F(CompactFilesTest, FilterContext) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
const int kLevel0Trigger = 10;
options.create_if_missing = true;
options.compaction_style = kCompactionStyleLevel;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.compression = kNoCompression;

std::shared_ptr<CompactionFilter::Context> expected_context(
new CompactionFilter::Context);
std::shared_ptr<int> compaction_count(new int(0));
CompactionFilterFactory* factory =
new TestFilterFactory(expected_context, compaction_count);
options.compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(factory);

DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);

// `Flush` is different from `Compaction`.
db->Put(WriteOptions(), std::to_string(1), "");
db->Put(WriteOptions(), std::to_string(51), "");
db->Flush(FlushOptions());
db->Put(WriteOptions(), std::to_string(50), "");
db->Put(WriteOptions(), std::to_string(99), "");
db->Flush(FlushOptions());
ASSERT_EQ(*compaction_count.get(), 0);

db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
usleep(10000); // Wait for compaction start.
ASSERT_EQ(expected_context->start_key, Slice("1"));
ASSERT_EQ(expected_context->is_end_key_inclusive, 1);
ASSERT_EQ(expected_context->input_table_properties.size(), 2);
ASSERT_EQ(*compaction_count.get(), 1);

delete (db);
}

TEST_F(CompactFilesTest, L0ConflictsFiles) {
Options options;
// to trigger compaction more easily
Expand Down Expand Up @@ -485,11 +558,91 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) {
delete db;
}

TEST_F(CompactFilesTest, IsWriteStalled) {
class SlowFilter : public CompactionFilter {
public:
SlowFilter(std::atomic<bool>* would_block) { would_block_ = would_block; }

bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
while (would_block_->load(std::memory_order_relaxed)) {
usleep(10000);
}
return false;
}

const char* Name() const override { return "SlowFilter"; }

private:
std::atomic<bool>* would_block_;
};

Options options;
options.create_if_missing = true;
options.delayed_write_rate = 1;

ColumnFamilyOptions cf_options;
cf_options.level0_slowdown_writes_trigger = 12;
cf_options.level0_stop_writes_trigger = 15;
cf_options.write_buffer_size = 1024 * 1024;

std::atomic<bool> compaction_would_block;
compaction_would_block.store(true, std::memory_order_relaxed);
cf_options.compaction_filter = new SlowFilter(&compaction_would_block);

std::vector<ColumnFamilyDescriptor> cfds;
cfds.push_back(ColumnFamilyDescriptor("default", cf_options));

DB* db = nullptr;
std::vector<ColumnFamilyHandle*> handles;
DestroyDB(db_name_, options);

Status s = DB::Open(options, db_name_, cfds, &handles, &db);
assert(s.ok());
assert(db);

int flushed_l0_files = 0;
for (; flushed_l0_files < 100;) {
WriteBatch wb;
for (int j = 0; j < 100; ++j) {
char key[16];
bzero(key, 16);
sprintf(key, "foo%.2d", j);
ASSERT_OK(wb.Put(handles[0], key, "bar"));
}

WriteOptions wopts;
wopts.no_slowdown = true;
s = db->Write(wopts, &wb);
if (s.ok()) {
FlushOptions fopts;
fopts.allow_write_stall = true;
ASSERT_OK(db->Flush(fopts));
++flushed_l0_files;
} else {
ASSERT_EQ(s.code(), Status::Code::kIncomplete);
break;
}
}

// The write loop must be terminated by write stall.
ASSERT_EQ(flushed_l0_files, 12);
uint64_t stalled = false;
db->GetIntProperty(handles[0], "rocksdb.is-write-stalled", &stalled);
ASSERT_TRUE(stalled);

compaction_would_block.store(false, std::memory_order_relaxed);
for (size_t i = 0; i < handles.size(); ++i) {
delete handles[i];
}
delete (db);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

9 changes: 8 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,8 @@ uint64_t Compaction::OutputFilePreallocationSize() const {
preallocation_size + (preallocation_size / 10));
}

std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter(
std::optional<Slice> start, std::optional<Slice> end) const {
if (!cfd_->ioptions()->compaction_filter_factory) {
return nullptr;
}
Expand All @@ -830,6 +831,12 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
context.column_family_id = cfd_->GetID();
context.reason = TableFileCreationReason::kCompaction;
context.input_table_properties = GetInputTableProperties();
context.is_bottommost_level = bottommost_level_;
context.start_key = start == std::nullopt ? GetSmallestUserKey()
: ExtractUserKey(start.value());
context.end_key =
end == std::nullopt ? GetLargestUserKey() : ExtractUserKey(end.value());
context.is_end_key_inclusive = end == std::nullopt;
if (context.input_table_properties.empty()) {
ROCKS_LOG_WARN(
immutable_options_.info_log,
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ class Compaction {
void ResetNextCompactionIndex();

// Create a CompactionFilter from compaction_filter_factory
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
std::optional<Slice> start, std::optional<Slice> end) const;

// Create a SstPartitioner from sst_partitioner_factory
std::unique_ptr<SstPartitioner> CreateSstPartitioner() const;
Expand Down
31 changes: 17 additions & 14 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) {
return true;
}

CompactionFilter::Decision decision =
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: ikey_.type == kTypeBlobIndex
? CompactionFilter::ValueType::kBlobIndex
: CompactionFilter::ValueType::kWideColumnEntity;
CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue;
if (ikey_.type == kTypeBlobIndex) {
value_type = CompactionFilter::ValueType::kBlobIndex;
} else if (ikey_.type == kTypeWideColumnEntity) {
value_type = CompactionFilter::ValueType::kWideColumnEntity;
} else if (ikey_.type == kTypeDeletion) {
value_type = CompactionFilter::ValueType::kDeletion;
}

// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Expand Down Expand Up @@ -277,7 +280,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,

// For integrated BlobDB impl, CompactionIterator reads blob value.
// For Stacked BlobDB impl, the corresponding CompactionFilter's
// FilterV2 method should read the blob value.
// FilterV4 method should read the blob value.
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
Expand Down Expand Up @@ -337,9 +340,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
existing_col = &existing_columns;
}

decision = compaction_filter_->FilterV3(
level_, filter_key, value_type, existing_val, existing_col,
&compaction_filter_value_, &new_columns,
decision = compaction_filter_->UnsafeFilter(
level_, filter_key, ikey_.sequence, value_type, existing_val,
existing_col, &compaction_filter_value_, &new_columns,
compaction_filter_skip_until_.rep());
}

Expand All @@ -348,10 +351,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (decision == CompactionFilter::Decision::kUndetermined) {
// Should not reach here, since FilterV2/FilterV3 should never return
// kUndetermined.
// Should not reach here, since FilterV2/FilterV3/FilterV4 should never
// return kUndetermined.
status_ = Status::NotSupported(
"FilterV2/FilterV3 should never return kUndetermined");
"FilterV2/FilterV3/FilterV4 should never return kUndetermined");
validity_info_.Invalidate();
return false;
}
Expand All @@ -360,7 +363,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2/FilterV3 documentation.
// Keep the key as per FilterV2/FilterV3/FilterV4 documentation.
decision = CompactionFilter::Decision::kKeep;
}

Expand Down
36 changes: 36 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,42 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) {
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}

TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) {
struct Filter : public CompactionFilter {
Decision UnsafeFilter(
int /*level*/, const Slice& key, SequenceNumber /*seq*/, ValueType t,
const Slice* /*existing_value*/, const WideColumns*,
std::string* /*new_value*/,
std::vector<std::pair<std::string, std::string>>* /*new_columns*/,
std::string* skip_until) const override {
if (t == ValueType::kDeletion) {
*skip_until = key.ToString();
skip_until->back() += 1;
filtered += 1;
return Decision::kRemoveAndSkipUntil;
}
return Decision::kKeep;
}

const char* Name() const override {
return "CompactionIteratorTest.SingleDelete::Filter";
}
mutable size_t filtered = 0;
};

Filter filter;
InitIterators(
{test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue),
test::KeyStr("c", 70, kTypeDeletion),
test::KeyStr("c", 50, kTypeDeletion)},
{"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, &filter);

c_iter_->SeekToFirst();
ASSERT_TRUE(!c_iter_->Valid());
ASSERT_EQ(filter.filtered, 2);
}

// In bottommost level, values earlier than earliest snapshot can be output
// with sequence = 0.
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
sub_compact->compaction->CreateCompactionFilter(sub_compact->start,
sub_compact->end);
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
Expand Down
5 changes: 5 additions & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ class DBIter final : public Iterator {
return wide_columns_;
}

bool seqno(SequenceNumber* no) const override {
assert(valid_);
*no = ikey_.sequence;
return true;
}
Status status() const override {
if (status_.ok()) {
return iter_.status();
Expand Down
Loading

0 comments on commit 2a93687

Please sign in to comment.