Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add small time cost compaction policy #2172

Merged
merged 4 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0
# a small compact is triggered automatically if the small compaction feature is enabled.
# small-compaction-threshold default value is 5000 and the value range is [1, 100000].
small-compaction-threshold : 5000
small-compaction-duration-threshold : 10000

# The maximum total size of all live memtables of the RocksDB instance that owned by Pika.
# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB
Expand Down
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return small_compaction_threshold_;
}
int small_compaction_duration_threshold() {
std::shared_lock l(rwlock_);
return small_compaction_duration_threshold_;
}
int max_background_flushes() {
std::shared_lock l(rwlock_);
return max_background_flushes_;
Expand Down Expand Up @@ -408,6 +412,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("small-compaction-threshold", std::to_string(value));
small_compaction_threshold_ = value;
}
void SetSmallCompactionDurationThreshold(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value));
small_compaction_duration_threshold_ = value;
}
void SetMaxClientResponseSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-client-response-size", std::to_string(value));
Expand Down Expand Up @@ -660,6 +669,7 @@ class PikaConf : public pstd::BaseConf {

int max_cache_statistic_keys_ = 0;
int small_compaction_threshold_ = 0;
int small_compaction_duration_threshold_ = 0;
int max_background_flushes_ = 0;
int max_background_compactions_ = 0;
int max_background_jobs_ = 0;
Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class PikaServer : public pstd::noncopyable {
void PrepareSlotTrySync();
void SlotSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
void SlotSetSmallCompactionThreshold(uint32_t small_compaction_threshold);
void SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);
bool GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* boffset);
std::shared_ptr<Slot> GetSlotByDBName(const std::string& db_name);
std::shared_ptr<Slot> GetDBSlotById(const std::string& db_name, uint32_t slot_id);
Expand Down
17 changes: 16 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->small_compaction_threshold());
}

if (pstd::stringmatch(pattern.data(), "small-compaction-duration-threshold", 1) != 0) {
elements += 2;
EncodeString(&config_body, "small-compaction-duration-threshold");
EncodeNumber(&config_body, g_pika_conf->small_compaction_duration_threshold());
}

if (pstd::stringmatch(pattern.data(), "max-background-flushes", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-background-flushes");
Expand Down Expand Up @@ -2015,7 +2021,7 @@ void ConfigCmd::ConfigGet(std::string& ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*28\r\n";
ret = "*29\r\n";
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
Expand All @@ -2034,6 +2040,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "write-binlog");
EncodeString(&ret, "max-cache-statistic-keys");
EncodeString(&ret, "small-compaction-threshold");
EncodeString(&ret, "small-compaction-duration-threshold");
EncodeString(&ret, "max-client-response-size");
EncodeString(&ret, "db-sync-speed");
EncodeString(&ret, "compact-cron");
Expand Down Expand Up @@ -2166,6 +2173,14 @@ void ConfigCmd::ConfigSet(std::string& ret) {
g_pika_conf->SetSmallCompactionThreshold(static_cast<int>(ival));
g_pika_server->SlotSetSmallCompactionThreshold(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "small-compaction-duration-threshold") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n";
return;
}
g_pika_conf->SetSmallCompactionDurationThreshold(static_cast<int>(ival));
g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "max-client-response-size") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n";
Expand Down
15 changes: 13 additions & 2 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,18 @@ int PikaConf::Load() {

small_compaction_threshold_ = 5000;
GetConfInt("small-compaction-threshold", &small_compaction_threshold_);
if (small_compaction_threshold_ <= 0 || small_compaction_threshold_ >= 100000) {
small_compaction_threshold_ = 5000;
if (small_compaction_threshold_ < 0) {
small_compaction_threshold_ = 0;
} else if (small_compaction_threshold_ >= 100000) {
small_compaction_threshold_ = 100000;
}

small_compaction_duration_threshold_ = 10000;
GetConfInt("small-compaction-duration-threshold", &small_compaction_duration_threshold_);
if (small_compaction_duration_threshold_ < 0) {
small_compaction_duration_threshold_ = 0;
} else if (small_compaction_duration_threshold_ >= 1000000) {
small_compaction_duration_threshold_ = 1000000;
}

max_background_flushes_ = 1;
Expand Down Expand Up @@ -655,6 +665,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("replication-id", replication_id_);
SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_);
SetConfInt("small-compaction-threshold", small_compaction_threshold_);
SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_);
SetConfInt("max-client-response-size", static_cast<int32_t>(max_client_response_size_));
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("compact-cron", compact_cron_);
Expand Down
11 changes: 11 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ void PikaServer::SlotSetSmallCompactionThreshold(uint32_t small_compaction_thres
}
}

void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
slot_item.second->DbRWLockReader();
slot_item.second->db()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
slot_item.second->DbRWUnLock();
}
}
}

bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id,
BinlogOffset* const boffset) {
std::shared_ptr<SyncMasterSlot> slot =
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct StorageOptions {
bool share_block_cache = false;
size_t statistics_max_size = 0;
size_t small_compaction_threshold = 5000;
size_t small_compaction_duration_threshold = 10000;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
};

Expand Down Expand Up @@ -1018,6 +1019,7 @@ class Storage {

Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);

std::string GetCurrentTaskType();
Status GetUsage(const std::string& property, uint64_t* result);
Expand Down
40 changes: 29 additions & 11 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ Redis::Redis(Storage* const s, const DataType& type)
: storage_(s),
type_(type),
lock_mgr_(std::make_shared<LockMgr>(1000, 0, std::make_shared<MutexFactoryImpl>())),
small_compaction_threshold_(5000) {
statistics_store_ = std::make_unique<LRUCache<std::string, size_t>>();
small_compaction_threshold_(5000),
small_compaction_duration_threshold_(10000) {
statistics_store_ = std::make_unique<LRUCache<std::string, KeyStatistics>>();
scan_cursors_store_ = std::make_unique<LRUCache<std::string, std::string>>();
scan_cursors_store_->SetCapacity(5000);
default_compact_range_options_.exclusive_manual_compaction = false;
Expand Down Expand Up @@ -46,23 +47,40 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) {
return Status::OK();
}

Status Redis::SetSmallCompactionThreshold(size_t small_compaction_threshold) {
Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) {
small_compaction_threshold_ = small_compaction_threshold;
return Status::OK();
}

Status Redis::UpdateSpecificKeyStatistics(const std::string& key, size_t count) {
if ((statistics_store_->Capacity() != 0U) && (count != 0U)) {
size_t total = 0;
statistics_store_->Lookup(key, &total);
statistics_store_->Insert(key, total + count);
AddCompactKeyTaskIfNeeded(key, total + count);
Status Redis::SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold) {
small_compaction_duration_threshold_ = small_compaction_duration_threshold;
return Status::OK();
}

Status Redis::UpdateSpecificKeyStatistics(const std::string& key, uint64_t count) {
if ((statistics_store_->Capacity() != 0U) && (count != 0U) && (small_compaction_threshold_ != 0U)) {
KeyStatistics data;
statistics_store_->Lookup(key, &data);
data.AddModifyCount(count);
statistics_store_->Insert(key, data);
AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration());
}
return Status::OK();
}

Status Redis::UpdateSpecificKeyDuration(const std::string& key, uint64_t duration) {
if ((statistics_store_->Capacity() != 0U) && (duration != 0U) && (small_compaction_duration_threshold_ != 0U)) {
KeyStatistics data;
statistics_store_->Lookup(key, &data);
data.AddDuration(duration);
statistics_store_->Insert(key, data);
AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration());
}
return Status::OK();
}

Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, size_t total) {
if (total < small_compaction_threshold_) {
Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration) {
if (count < small_compaction_threshold_ || duration < small_compaction_duration_threshold_) {
return Status::OK();
} else {
storage_->AddBGTask({type_, kCompactRange, {key, key}});
Expand Down
70 changes: 65 additions & 5 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "rocksdb/slice.h"
#include "rocksdb/status.h"

#include "pstd/include/env.h"

#include "src/lock_mgr.h"
#include "src/lru_cache.h"
#include "src/mutex_impl.h"
Expand All @@ -30,6 +32,61 @@ class Redis {

rocksdb::DB* GetDB() { return db_; }

struct KeyStatistics {
size_t window_size;
std::deque<uint64_t> durations;

uint64_t modify_count;

KeyStatistics() : KeyStatistics(10) {}

KeyStatistics(size_t size) : window_size(size + 2), modify_count(0) {}

void AddDuration(uint64_t duration) {
durations.push_back(duration);
while (durations.size() > window_size) {
durations.pop_front();
}
}
uint64_t AvgDuration() {
if (durations.size () < window_size) {
return 0;
}
uint64_t min = durations[0];
uint64_t max = durations[0];
uint64_t sum = 0;
for (auto duration : durations) {
if (duration < min) {
min = duration;
}
if (duration > max) {
max = duration;
}
sum += duration;
}
return (sum - max - min) / (durations.size() - 2);
}
void AddModifyCount(uint64_t count) {
modify_count += count;
}
uint64_t ModifyCount() {
return modify_count;
}
};

struct KeyStatisticsDurationGuard {
Redis* ctx;
std::string key;
uint64_t start_us;
KeyStatisticsDurationGuard(Redis* that, const std::string& key): ctx(that), key(key), start_us(pstd::NowMicros()) {
}
~KeyStatisticsDurationGuard() {
uint64_t end_us = pstd::NowMicros();
uint64_t duration = end_us > start_us ? end_us - start_us : 0;
ctx->UpdateSpecificKeyDuration(key, duration);
}
};

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void SetWriteWalOptions(const bool is_wal_disable);

Expand All @@ -54,7 +111,8 @@ class Redis {
virtual Status TTL(const Slice& key, int64_t* timestamp) = 0;

Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(size_t small_compaction_threshold);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Expand All @@ -75,11 +133,13 @@ class Redis {
Status StoreScanNextPoint(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_point);

// For Statistics
std::atomic<size_t> small_compaction_threshold_;
std::unique_ptr<LRUCache<std::string, size_t>> statistics_store_;
std::atomic_uint64_t small_compaction_threshold_;
std::atomic_uint64_t small_compaction_duration_threshold_;
std::unique_ptr<LRUCache<std::string, KeyStatistics>> statistics_store_;

Status UpdateSpecificKeyStatistics(const std::string& key, size_t count);
Status AddCompactKeyTaskIfNeeded(const std::string& key, size_t total);
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
};

} // namespace storage
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RedisHashes::RedisHashes(Storage* const s, const DataType& type) : Redis(s, type
Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) {
statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;
small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold;

rocksdb::Options ops(storage_options.options);
Status s = rocksdb::DB::Open(ops, db_path, &db_);
Expand Down Expand Up @@ -298,6 +299,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
ParsedHashesDataKey parsed_hashes_data_key(iter->key());
Expand Down Expand Up @@ -473,6 +475,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector<std::string>* fields) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
ParsedHashesDataKey parsed_hashes_data_key(iter->key());
Expand Down Expand Up @@ -745,6 +748,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector<std::string>* values) {
version = parsed_hashes_meta_value.version();
HashesDataKey hashes_data_key(key, version, "");
Slice prefix = hashes_data_key.Encode();
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
values->push_back(iter->value().ToString());
Expand Down Expand Up @@ -807,6 +811,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
HashesDataKey hashes_data_prefix(key, version, sub_field);
HashesDataKey hashes_start_data_key(key, version, start_point);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix);
iter->Next()) {
Expand Down Expand Up @@ -857,6 +862,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, version, start_field);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix);
iter->Next()) {
Expand Down Expand Up @@ -913,6 +919,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, version, field_start);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->Seek(start_no_limit ? prefix : hashes_start_data_key.Encode());
iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Next()) {
Expand Down Expand Up @@ -973,6 +980,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co
HashesDataKey hashes_data_prefix(key, version, Slice());
HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field);
std::string prefix = hashes_data_prefix.Encode().ToString();
KeyStatisticsDurationGuard guard(this, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]);
for (iter->SeekForPrev(hashes_start_data_key.Encode().ToString());
iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Prev()) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type)
Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) {
statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;
small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold;

rocksdb::Options ops(storage_options.options);
Status s = rocksdb::DB::Open(ops, db_path, &db_);
Expand Down
Loading