Skip to content

Commit

Permalink
Add experimental range filters to stress/crash test
Browse files Browse the repository at this point in the history
Summary: Implemented two key segment extractors that satisfy the
"segment prefix property," one with variable segment widths and one with
fixed. Used these to create a couple of named configs and versions that
are randomly selected by the crash test. On the read side, the required
table_filter is set up everywhere I found the stress test uses
iterator_upper_bound.

Writing filters on new SST files and applying filters on SST files to
range queries are configured independently, to potentially help with
isolating different sides of the functionality.

Not yet implemented / possible follow-up:
* Not yet using categories in the extractors
* Not yet dynamically changing the filtering version

Test Plan: Some stress test trial runs. Inserted some temporary probes
to ensure code was being exercised (more or less) as intended.
  • Loading branch information
pdillinger committed Jun 13, 2024
1 parent 7620315 commit 1e49231
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 8 deletions.
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
"db_stress_tool/cf_consistency_stress.cc",
"db_stress_tool/db_stress_common.cc",
"db_stress_tool/db_stress_driver.cc",
"db_stress_tool/db_stress_filters.cc",
"db_stress_tool/db_stress_gflags.cc",
"db_stress_tool/db_stress_listener.cc",
"db_stress_tool/db_stress_shared_state.cc",
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
db_stress.cc
db_stress_common.cc
db_stress_driver.cc
db_stress_filter.cc
db_stress_gflags.cc
db_stress_listener.cc
db_stress_shared_state.cc
Expand Down
5 changes: 5 additions & 0 deletions db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,11 @@ class BatchedOpsStressTest : public StressTest {
// For half of the time, set the upper bound to the next prefix
ub_slices[i] = upper_bounds[i];
ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
if (FLAGS_use_sqfc_for_range_queries) {
ro_copies[i].table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix_slices[i],
ub_slices[i]);
}
}

iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,10 @@ class CfConsistencyStressTest : public StressTest {
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
if (FLAGS_use_sqfc_for_range_queries) {
ro_copy.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
}
}

ColumnFamilyHandle* const cfh =
Expand Down
3 changes: 3 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ DECLARE_int32(bloom_before_level);
DECLARE_bool(partition_filters);
DECLARE_bool(optimize_filters_for_memory);
DECLARE_bool(detect_filter_construct_corruption);
DECLARE_string(sqfc_name);
DECLARE_uint32(sqfc_version);
DECLARE_bool(use_sqfc_for_range_queries);
DECLARE_int32(index_type);
DECLARE_int32(data_block_index_type);
DECLARE_string(db);
Expand Down
93 changes: 93 additions & 0 deletions db_stress_tool/db_stress_filters.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db_stress_tool/db_stress_filters.h"

#include <memory>
#include <mutex>

namespace ROCKSDB_NAMESPACE {

#ifdef GFLAGS

using experimental::KeySegmentsExtractor;
using experimental::MakeSharedBytewiseMinMaxSQFC;
using experimental::SelectKeySegment;
using experimental::SstQueryFilterConfigs;
using experimental::SstQueryFilterConfigsManager;

namespace {
class VariableWidthExtractor : public KeySegmentsExtractor {
public:
const char* Name() const override { return "VariableWidthExtractor"; }

void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
Result* result) const override {
uint32_t len = static_cast<uint32_t>(key_or_bound.size());
// This uses as delimiter any zero byte that follows a non-zero byte.
// And in accordance with best practice, the delimiter is part of the
// segment leading up to it.
bool prev_non_zero = false;
for (uint32_t i = 0; i < len; ++i) {
if ((prev_non_zero && key_or_bound[i] == 0) || i + 1 == len) {
result->segment_ends.push_back(i + 1);
}
prev_non_zero = key_or_bound[i] != 0;
}
}
};
const auto kVariableWidthExtractor = std::make_shared<VariableWidthExtractor>();
class FixedWidthExtractor : public KeySegmentsExtractor {
public:
const char* Name() const override { return "FixedWidthExtractor"; }

void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
Result* result) const override {
uint32_t len = static_cast<uint32_t>(key_or_bound.size());
// Fixed 8-byte segments, with any leftovers going into another
// segment.
uint32_t i = 0;
while (i + 8 <= len) {
i += 8;
result->segment_ends.push_back(i);
}
if (i < len) {
result->segment_ends.push_back(len);
}
}
};
const auto kFixedWidthExtractor = std::make_shared<FixedWidthExtractor>();
// NOTE: MinMax filter on segment 0 is not normally useful because of metadata
// on smallest and largest key for each SST file. But it doesn't hurt to test.
const auto kFilter0 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(0));
const auto kFilter1 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(1));
const auto kFilter2 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(2));
const auto kFilter3 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(3));
const SstQueryFilterConfigs fooConfigs1{{kFilter0, kFilter2},
kVariableWidthExtractor};
const SstQueryFilterConfigs fooConfigs2{{kFilter1, kFilter3},
kVariableWidthExtractor};
const SstQueryFilterConfigs barConfigs2{{kFilter1, kFilter2},
kFixedWidthExtractor};
const SstQueryFilterConfigsManager::Data data = {
{1, {{"foo", fooConfigs1}}},
{2, {{"foo", fooConfigs2}, {"bar", barConfigs2}}},
};
} // namespace

SstQueryFilterConfigsManager& DbStressSqfcManager() {
std::once_flag flag;
static std::shared_ptr<SstQueryFilterConfigsManager> mgr;
std::call_once(flag, []() {
Status s = SstQueryFilterConfigsManager::MakeShared(data, &mgr);
assert(s.ok());
assert(mgr);
});
return *mgr;
}

#endif // GFLAGS

} // namespace ROCKSDB_NAMESPACE
16 changes: 16 additions & 0 deletions db_stress_tool/db_stress_filters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#ifdef GFLAGS
#pragma once

#include "rocksdb/experimental.h"

namespace ROCKSDB_NAMESPACE {

experimental::SstQueryFilterConfigsManager& DbStressSqfcManager();

} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
10 changes: 10 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,16 @@ DEFINE_bool(
.detect_filter_construct_corruption,
"Detect corruption during new Bloom Filter and Ribbon Filter construction");

DEFINE_string(sqfc_name, "foo",
"Config name to select from SstQueryFilterConfigsManager.");

DEFINE_uint32(sqfc_version, 0,
"User-defined filtering version to select from "
"SstQueryFilterConfigsManager. 0 = disable writing filters");

DEFINE_bool(use_sqfc_for_range_queries, true,
"Apply SstQueryFilters to range queries");

DEFINE_int32(
index_type,
static_cast<int32_t>(
Expand Down
37 changes: 30 additions & 7 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_compaction_filter.h"
#include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_filters.h"
#include "db_stress_tool/db_stress_table_properties_collector.h"
#include "db_stress_tool/db_stress_wide_merge_operator.h"
#include "options/options_parser.h"
Expand Down Expand Up @@ -90,6 +91,14 @@ StressTest::StressTest()
exit(1);
}
}

Status s = DbStressSqfcManager().MakeSharedFactory(
FLAGS_sqfc_name, FLAGS_sqfc_version, &sqfc_factory_);
if (!s.ok()) {
fprintf(stderr, "Error initializing SstQueryFilterConfig: %s\n",
s.ToString().c_str());
exit(1);
}
}

StressTest::~StressTest() {
Expand Down Expand Up @@ -1405,8 +1414,8 @@ Status StressTest::TestIterate(ThreadState* thread,
}
std::string upper_bound_str;
Slice upper_bound;
if (thread->rand.OneIn(16)) {
// With a 1/16 chance, set an iterator upper bound.
// Prefer no bound with no range query filtering; prefer bound with it
if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
// Note: upper_bound can be smaller than the seek key.
const int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
upper_bound_str = Key(rand_upper_key);
Expand All @@ -1416,15 +1425,20 @@ Status StressTest::TestIterate(ThreadState* thread,

std::string lower_bound_str;
Slice lower_bound;
if (thread->rand.OneIn(16)) {
// With a 1/16 chance, enable iterator lower bound.
if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
// Note: lower_bound can be greater than the seek key.
const int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
lower_bound_str = Key(rand_lower_key);
lower_bound = Slice(lower_bound_str);
ro.iterate_lower_bound = &lower_bound;
}

if (FLAGS_use_sqfc_for_range_queries && ro.iterate_upper_bound &&
ro.iterate_lower_bound) {
ro.table_filter = sqfc_factory_->GetTableFilterForRangeQuery(
*ro.iterate_lower_bound, *ro.iterate_upper_bound);
}

std::unique_ptr<Iterator> iter;

if (FLAGS_use_multi_cf_iterator) {
Expand Down Expand Up @@ -1463,17 +1477,21 @@ Status StressTest::TestIterate(ThreadState* thread,
op_logs = "(cleared...)\n";
}

if (ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
if (!FLAGS_use_sqfc_for_range_queries &&
ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
// With a 1/2 chance, change the upper bound.
// Not compatible with sqfc range filter.
// It is possible that it is changed before first use, but there is no
// problem with that.
const int64_t rand_upper_key =
GenerateOneKey(thread, FLAGS_ops_per_thread);
upper_bound_str = Key(rand_upper_key);
upper_bound = Slice(upper_bound_str);
}
if (ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
if (!FLAGS_use_sqfc_for_range_queries &&
ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
// With a 1/4 chance, change the lower bound.
// Not compatible with sqfc range filter.
// It is possible that it is changed before first use, but there is no
// problem with that.
const int64_t rand_lower_key =
Expand Down Expand Up @@ -3045,7 +3063,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
if (!InitializeOptionsFromFile(options_)) {
InitializeOptionsFromFlags(cache_, filter_policy_, options_);
}
InitializeOptionsGeneral(cache_, filter_policy_, options_);
InitializeOptionsGeneral(cache_, filter_policy_, sqfc_factory_, options_);

if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
fprintf(stderr,
Expand Down Expand Up @@ -3928,6 +3946,7 @@ void InitializeOptionsFromFlags(
void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<const FilterPolicy>& filter_policy,
const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
Options& options) {
options.create_missing_column_families = true;
options.create_if_missing = true;
Expand Down Expand Up @@ -4001,6 +4020,10 @@ void InitializeOptionsGeneral(

options.table_properties_collector_factories.emplace_back(
std::make_shared<DbStressTablePropertiesCollectorFactory>());

if (sqfc_factory && !sqfc_factory->GetConfigs().IsEmptyNotFound()) {
options.table_properties_collector_factories.emplace_back(sqfc_factory);
}
}

} // namespace ROCKSDB_NAMESPACE
Expand Down
7 changes: 6 additions & 1 deletion db_stress_tool/db_stress_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@

#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/experimental.h"

namespace ROCKSDB_NAMESPACE {
class SystemClock;
class Transaction;
class TransactionDB;
class OptimisticTransactionDB;
struct TransactionDBOptions;
using experimental::SstQueryFilterConfigsManager;

class StressTest {
public:
Expand Down Expand Up @@ -303,6 +305,7 @@ class StressTest {
std::unordered_map<std::string, std::vector<std::string>> options_table_;
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
std::shared_ptr<SstQueryFilterConfigsManager::Factory> sqfc_factory_;

// Fields used for continuous verification from another thread
DB* cmp_db_;
Expand Down Expand Up @@ -344,7 +347,9 @@ void InitializeOptionsFromFlags(
// from OPTIONS file.
void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
const std::shared_ptr<const FilterPolicy>& filter_policy,
const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
Options& options);

// If no OPTIONS file is specified, set up `options` so that we can test
// user-defined timestamp which requires `-user_timestamp_size=8`.
Expand Down
12 changes: 12 additions & 0 deletions db_stress_tool/multi_ops_txns_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
ropts.iterate_upper_bound = &iter_ub;
ropts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(old_sk_prefix, iter_ub);
}
it = txn->GetIterator(ropts);

assert(it);
Expand Down Expand Up @@ -1107,6 +1111,10 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
ropts.snapshot = snapshot;
ropts.total_order_seek = true;
ropts.iterate_upper_bound = &iter_ub;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(start_key, iter_ub);
}

std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->Seek(start_key); it->Valid(); it->Next()) {
Expand Down Expand Up @@ -1615,6 +1623,10 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {
ropts.iterate_lower_bound = &pk_lb;
ropts.iterate_upper_bound = &pk_ub;
ropts.total_order_seek = true;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(pk_lb, pk_ub);
}
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));

for (it->SeekToFirst(); it->Valid(); it->Next()) {
Expand Down
11 changes: 11 additions & 0 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,10 @@ class NonBatchedOpsStressTest : public StressTest {
// For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
if (FLAGS_use_sqfc_for_range_queries) {
ro_copy.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
}
}

std::string read_ts_str;
Expand Down Expand Up @@ -1902,6 +1906,13 @@ class NonBatchedOpsStressTest : public StressTest {
// GetIntVal().
ro.iterate_upper_bound = &max_key_slice;
}
std::string ub_str, lb_str;
if (FLAGS_use_sqfc_for_range_queries) {
ub_str = Key(ub);
lb_str = Key(lb);
ro.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(lb_str, ub_str);
}

ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
assert(cfh);
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ STRESS_LIB_SOURCES = \
db_stress_tool/cf_consistency_stress.cc \
db_stress_tool/db_stress_common.cc \
db_stress_tool/db_stress_driver.cc \
db_stress_tool/db_stress_filters.cc \
db_stress_tool/db_stress_gflags.cc \
db_stress_tool/db_stress_listener.cc \
db_stress_tool/db_stress_shared_state.cc \
Expand Down
Loading

0 comments on commit 1e49231

Please sign in to comment.