Skip to content

Commit

Permalink
feat:add small time cost compaction policy (#2172)
Browse files Browse the repository at this point in the history
* add new small compaction policy

* update function definition

* edge case

* update type definition

---------

Co-authored-by: denghao.denghao <[email protected]>
  • Loading branch information
u6th9d and denghao.denghao authored Dec 6, 2023
1 parent ff9adf6 commit 559efc7
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 74 deletions.
1 change: 1 addition & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0
# a small compact is triggered automatically if the small compaction feature is enabled.
# small-compaction-threshold default value is 5000 and the value range is [1, 100000].
small-compaction-threshold : 5000
small-compaction-duration-threshold : 10000

# The maximum total size of all live memtables of the RocksDB instance that owned by Pika.
# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB
Expand Down
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return small_compaction_threshold_;
}
int small_compaction_duration_threshold() {
std::shared_lock l(rwlock_);
return small_compaction_duration_threshold_;
}
int max_background_flushes() {
std::shared_lock l(rwlock_);
return max_background_flushes_;
Expand Down Expand Up @@ -425,6 +429,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("small-compaction-threshold", std::to_string(value));
small_compaction_threshold_ = value;
}
void SetSmallCompactionDurationThreshold(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value));
small_compaction_duration_threshold_ = value;
}
void SetMaxClientResponseSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-client-response-size", std::to_string(value));
Expand Down Expand Up @@ -684,6 +693,7 @@ class PikaConf : public pstd::BaseConf {

int max_cache_statistic_keys_ = 0;
int small_compaction_threshold_ = 0;
int small_compaction_duration_threshold_ = 0;
int max_background_flushes_ = 0;
int max_background_compactions_ = 0;
int max_background_jobs_ = 0;
Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class PikaServer : public pstd::noncopyable {
void PrepareSlotTrySync();
void SlotSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
void SlotSetSmallCompactionThreshold(uint32_t small_compaction_threshold);
void SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);
bool GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* boffset);
std::shared_ptr<Slot> GetSlotByDBName(const std::string& db_name);
std::shared_ptr<Slot> GetDBSlotById(const std::string& db_name, uint32_t slot_id);
Expand Down
17 changes: 16 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->small_compaction_threshold());
}

if (pstd::stringmatch(pattern.data(), "small-compaction-duration-threshold", 1) != 0) {
elements += 2;
EncodeString(&config_body, "small-compaction-duration-threshold");
EncodeNumber(&config_body, g_pika_conf->small_compaction_duration_threshold());
}

if (pstd::stringmatch(pattern.data(), "max-background-flushes", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-background-flushes");
Expand Down Expand Up @@ -2090,7 +2096,7 @@ void ConfigCmd::ConfigGet(std::string& ret) {
void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr<Slot> slot) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*28\r\n";
ret = "*29\r\n";
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
Expand All @@ -2109,6 +2115,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr<Slot> slot) {
EncodeString(&ret, "write-binlog");
EncodeString(&ret, "max-cache-statistic-keys");
EncodeString(&ret, "small-compaction-threshold");
EncodeString(&ret, "small-compaction-duration-threshold");
EncodeString(&ret, "max-client-response-size");
EncodeString(&ret, "db-sync-speed");
EncodeString(&ret, "compact-cron");
Expand Down Expand Up @@ -2241,6 +2248,14 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr<Slot> slot) {
g_pika_conf->SetSmallCompactionThreshold(static_cast<int>(ival));
g_pika_server->SlotSetSmallCompactionThreshold(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "small-compaction-duration-threshold") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n";
return;
}
g_pika_conf->SetSmallCompactionDurationThreshold(static_cast<int>(ival));
g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "max-client-response-size") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n";
Expand Down
15 changes: 13 additions & 2 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,18 @@ int PikaConf::Load() {

small_compaction_threshold_ = 5000;
GetConfInt("small-compaction-threshold", &small_compaction_threshold_);
if (small_compaction_threshold_ <= 0 || small_compaction_threshold_ >= 100000) {
small_compaction_threshold_ = 5000;
if (small_compaction_threshold_ < 0) {
small_compaction_threshold_ = 0;
} else if (small_compaction_threshold_ >= 100000) {
small_compaction_threshold_ = 100000;
}

small_compaction_duration_threshold_ = 10000;
GetConfInt("small-compaction-duration-threshold", &small_compaction_duration_threshold_);
if (small_compaction_duration_threshold_ < 0) {
small_compaction_duration_threshold_ = 0;
} else if (small_compaction_duration_threshold_ >= 1000000) {
small_compaction_duration_threshold_ = 1000000;
}

max_background_flushes_ = 1;
Expand Down Expand Up @@ -724,6 +734,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("replication-id", replication_id_);
SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_);
SetConfInt("small-compaction-threshold", small_compaction_threshold_);
SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_);
SetConfInt("max-client-response-size", static_cast<int32_t>(max_client_response_size_));
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("compact-cron", compact_cron_);
Expand Down
11 changes: 11 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,17 @@ void PikaServer::SlotSetSmallCompactionThreshold(uint32_t small_compaction_thres
}
}

void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
slot_item.second->DbRWLockReader();
slot_item.second->db()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
slot_item.second->DbRWUnLock();
}
}
}

bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id,
BinlogOffset* const boffset) {
std::shared_ptr<SyncMasterSlot> slot =
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct StorageOptions {
bool share_block_cache = false;
size_t statistics_max_size = 0;
size_t small_compaction_threshold = 5000;
size_t small_compaction_duration_threshold = 10000;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
};

Expand Down Expand Up @@ -1034,6 +1035,7 @@ class Storage {

Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);

std::string GetCurrentTaskType();
Status GetUsage(const std::string& property, uint64_t* result);
Expand Down
40 changes: 29 additions & 11 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ Redis::Redis(Storage* const s, const DataType& type)
: storage_(s),
type_(type),
lock_mgr_(std::make_shared<LockMgr>(1000, 0, std::make_shared<MutexFactoryImpl>())),
small_compaction_threshold_(5000) {
statistics_store_ = std::make_unique<LRUCache<std::string, size_t>>();
small_compaction_threshold_(5000),
small_compaction_duration_threshold_(10000) {
statistics_store_ = std::make_unique<LRUCache<std::string, KeyStatistics>>();
scan_cursors_store_ = std::make_unique<LRUCache<std::string, std::string>>();
scan_cursors_store_->SetCapacity(5000);
default_compact_range_options_.exclusive_manual_compaction = false;
Expand Down Expand Up @@ -46,23 +47,40 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) {
return Status::OK();
}

Status Redis::SetSmallCompactionThreshold(size_t small_compaction_threshold) {
Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) {
small_compaction_threshold_ = small_compaction_threshold;
return Status::OK();
}

Status Redis::UpdateSpecificKeyStatistics(const std::string& key, size_t count) {
if ((statistics_store_->Capacity() != 0U) && (count != 0U)) {
size_t total = 0;
statistics_store_->Lookup(key, &total);
statistics_store_->Insert(key, total + count);
AddCompactKeyTaskIfNeeded(key, total + count);
Status Redis::SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold) {
small_compaction_duration_threshold_ = small_compaction_duration_threshold;
return Status::OK();
}

Status Redis::UpdateSpecificKeyStatistics(const std::string& key, uint64_t count) {
if ((statistics_store_->Capacity() != 0U) && (count != 0U) && (small_compaction_threshold_ != 0U)) {
KeyStatistics data;
statistics_store_->Lookup(key, &data);
data.AddModifyCount(count);
statistics_store_->Insert(key, data);
AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration());
}
return Status::OK();
}

Status Redis::UpdateSpecificKeyDuration(const std::string& key, uint64_t duration) {
if ((statistics_store_->Capacity() != 0U) && (duration != 0U) && (small_compaction_duration_threshold_ != 0U)) {
KeyStatistics data;
statistics_store_->Lookup(key, &data);
data.AddDuration(duration);
statistics_store_->Insert(key, data);
AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration());
}
return Status::OK();
}

Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, size_t total) {
if (total < small_compaction_threshold_) {
Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration) {
if (count < small_compaction_threshold_ || duration < small_compaction_duration_threshold_) {
return Status::OK();
} else {
storage_->AddBGTask({type_, kCompactRange, {key, key}});
Expand Down
70 changes: 65 additions & 5 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "rocksdb/slice.h"
#include "rocksdb/status.h"

#include "pstd/include/env.h"

#include "src/lock_mgr.h"
#include "src/lru_cache.h"
#include "src/mutex_impl.h"
Expand All @@ -30,6 +32,61 @@ class Redis {

rocksdb::DB* GetDB() { return db_; }

struct KeyStatistics {
size_t window_size;
std::deque<uint64_t> durations;

uint64_t modify_count;

KeyStatistics() : KeyStatistics(10) {}

KeyStatistics(size_t size) : window_size(size + 2), modify_count(0) {}

void AddDuration(uint64_t duration) {
durations.push_back(duration);
while (durations.size() > window_size) {
durations.pop_front();
}
}
uint64_t AvgDuration() {
if (durations.size () < window_size) {
return 0;
}
uint64_t min = durations[0];
uint64_t max = durations[0];
uint64_t sum = 0;
for (auto duration : durations) {
if (duration < min) {
min = duration;
}
if (duration > max) {
max = duration;
}
sum += duration;
}
return (sum - max - min) / (durations.size() - 2);
}
void AddModifyCount(uint64_t count) {
modify_count += count;
}
uint64_t ModifyCount() {
return modify_count;
}
};

struct KeyStatisticsDurationGuard {
Redis* ctx;
std::string key;
uint64_t start_us;
KeyStatisticsDurationGuard(Redis* that, const std::string& key): ctx(that), key(key), start_us(pstd::NowMicros()) {
}
~KeyStatisticsDurationGuard() {
uint64_t end_us = pstd::NowMicros();
uint64_t duration = end_us > start_us ? end_us - start_us : 0;
ctx->UpdateSpecificKeyDuration(key, duration);
}
};

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void SetWriteWalOptions(const bool is_wal_disable);

Expand All @@ -54,7 +111,8 @@ class Redis {
virtual Status TTL(const Slice& key, int64_t* timestamp) = 0;

Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(size_t small_compaction_threshold);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Expand All @@ -75,11 +133,13 @@ class Redis {
Status StoreScanNextPoint(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_point);

// For Statistics
std::atomic<size_t> small_compaction_threshold_;
std::unique_ptr<LRUCache<std::string, size_t>> statistics_store_;
std::atomic_uint64_t small_compaction_threshold_;
std::atomic_uint64_t small_compaction_duration_threshold_;
std::unique_ptr<LRUCache<std::string, KeyStatistics>> statistics_store_;

Status UpdateSpecificKeyStatistics(const std::string& key, size_t count);
Status AddCompactKeyTaskIfNeeded(const std::string& key, size_t total);
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
};

} // namespace storage
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RedisHashes::RedisHashes(Storage* const s, const DataType& type) : Redis(s, type
Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) {
statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;
small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold;

rocksdb::Options ops(storage_options.options);
Status s = rocksdb::DB::Open(ops, db_path, &db_);
Expand Down Expand Up @@ -298,6 +299,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
ParsedHashesDataKey parsed_hashes_data_key(iter->key());
Expand Down Expand Up @@ -516,6 +518,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector<std::string>* fields) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
ParsedHashesDataKey parsed_hashes_data_key(iter->key());
Expand Down Expand Up @@ -788,6 +791,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector<std::string>* values) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
values->push_back(iter->value().ToString());
Expand Down Expand Up @@ -850,6 +854,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
HashesDataKey hashes_data_prefix(key, version, sub_field);
HashesDataKey hashes_start_data_key(key, version, start_point);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix);
iter->Next()) {
Expand Down Expand Up @@ -900,6 +905,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, version, start_field);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix);
iter->Next()) {
Expand Down Expand Up @@ -956,6 +962,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, version, field_start);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(start_no_limit ? prefix : hashes_start_data_key.Encode());
iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Next()) {
Expand Down Expand Up @@ -1016,6 +1023,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->SeekForPrev(hashes_start_data_key.Encode().ToString());
iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Prev()) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type)
Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) {
statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;
small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold;

rocksdb::Options ops(storage_options.options);
Status s = rocksdb::DB::Open(ops, db_path, &db_);
Expand Down
Loading

0 comments on commit 559efc7

Please sign in to comment.