Skip to content

Commit

Permalink
new compact strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
QlQlqiqi committed Sep 18, 2024
1 parent 0137483 commit af1994c
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 9 deletions.
34 changes: 34 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,37 @@ internal-used-unfinished-full-sync :
# https://github.com/OpenAtomFoundation/pika/issues/2886
# default value: true
wash-data: true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
# Can choose `full-compact` or `obd-compact`.
# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255
compaction-strategy : obd-compact

# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
compact-every-num-of-files : 10

# For OBD_Compact
# In another search, if the file creation time is
# greater than `force-compact-file-age-seconds`,
# a compaction of the upper and lower boundaries
# of the file will be performed at the same time
# `compact-every-num-of-files` -1
force-compact-file-age-seconds : 300

# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
force-compact-min-delete-ratio : 10

# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
dont-compact-sst-created-in-seconds : 20

# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
37 changes: 37 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const uint32_t configReplicationIDSize = 50;
// global class, class members well initialized
class PikaConf : public pstd::BaseConf {
public:
enum CompactionStrategy {
FullCompact,
OldestOrBestDeleteRatioSstCompact
};
PikaConf(const std::string& path);
~PikaConf() override = default;

Expand Down Expand Up @@ -118,6 +122,30 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_subcompactions_;
}
int compact_every_num_of_files() {
std::shared_lock l(rwlock_);
return compact_every_num_of_files_;
}
int force_compact_file_age_seconds() {
std::shared_lock l(rwlock_);
return force_compact_file_age_seconds_;
}
int force_compact_min_delete_ratio() {
std::shared_lock l(rwlock_);
return force_compact_min_delete_ratio_;
}
int dont_compact_sst_created_in_seconds() {
std::shared_lock l(rwlock_);
return dont_compact_sst_created_in_seconds_;
}
int best_delete_min_ratio() {
std::shared_lock l(rwlock_);
return best_delete_min_ratio_;
}
CompactionStrategy compaction_strategy() {
std::shared_lock l(rwlock_);
return compaction_strategy_;
}
bool disable_auto_compactions() {
std::shared_lock l(rwlock_);
return disable_auto_compactions_;
Expand Down Expand Up @@ -931,6 +959,15 @@ class PikaConf : public pstd::BaseConf {
std::string compact_interval_;
int max_subcompactions_ = 1;
bool disable_auto_compactions_ = false;

// for obd_compact
int compact_every_num_of_files_;
int force_compact_file_age_seconds_;
int force_compact_min_delete_ratio_;
int dont_compact_sst_created_in_seconds_;
int best_delete_min_ratio_;
CompactionStrategy compaction_strategy_;

int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
double min_check_resume_ratio_ = 0.7;
Expand Down
1 change: 1 addition & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
// Compact use;
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);
void LongestNotCompactiontSstCompact(const storage::DataType& type);

void SetCompactRangeOptions(const bool is_canceled);

Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum TaskType {
kStopKeyScan,
kBgSave,
kCompactRangeAll,
kCompactOldestOrBestDeleteRatioSst,
};

struct TaskArg {
Expand Down
71 changes: 71 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,41 @@ int PikaConf::Load() {
max_subcompactions_ = 1;
}

GetConfInt("compact-every-num-of-files", &compact_every_num_of_files_);
if (compact_every_num_of_files_ < 10) {
compact_every_num_of_files_ = 10;
}

GetConfInt("force-compact-file-age-seconds", &force_compact_file_age_seconds_);
if (force_compact_file_age_seconds_ < 300) {
force_compact_file_age_seconds_ = 300;
}

GetConfInt("force-compact-min-delete-ratio", &force_compact_min_delete_ratio_);
if (force_compact_min_delete_ratio_ < 10) {
force_compact_min_delete_ratio_ = 10;
}

GetConfInt("dont-compact-sst-created-in-seconds", &dont_compact_sst_created_in_seconds_);
if (dont_compact_sst_created_in_seconds_ < 600) {
dont_compact_sst_created_in_seconds_ = 600;
}

GetConfInt("best-delete-min-ratio", &best_delete_min_ratio_);
if (best_delete_min_ratio_ < 10) {
best_delete_min_ratio_ = 10;
}

std::string cs_;
GetConfStr("compaction-strategy", &cs_);
if (cs_ == "full-compact") {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd-compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else {
compaction_strategy_ = FullCompact;
}

// least-free-disk-resume-size
GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_);
if (least_free_disk_to_resume_ <= 0) {
Expand Down Expand Up @@ -414,6 +449,11 @@ int PikaConf::Load() {
max_write_buffer_num_ = 2; // 1 for immutable memtable, 1 for mutable memtable
}

GetConfInt("min-write-buffer-number-to-merge", &min_write_buffer_number_to_merge_);
if (min_write_buffer_number_to_merge_ < 1) {
min_write_buffer_number_to_merge_ = 1; // 1 for immutable memtable to merge
}

// max_client_response_size
GetConfInt64Human("max-client-response-size", &max_client_response_size_);
if (max_client_response_size_ <= 0) {
Expand Down Expand Up @@ -798,6 +838,37 @@ int PikaConf::ConfigRewrite() {
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfInt("compact-every-num-of-files", compact_every_num_of_files_);
if (compact_every_num_of_files_ < 1) {
compact_every_num_of_files_ = 1;
}
SetConfInt("force-compact-file-age-seconds", force_compact_file_age_seconds_);
if (force_compact_file_age_seconds_ < 300) {
force_compact_file_age_seconds_ = 300;
}
SetConfInt("force-compact-min-delete-ratio", force_compact_min_delete_ratio_);
if (force_compact_min_delete_ratio_ < 5) {
force_compact_min_delete_ratio_ = 5;
}
SetConfInt("dont-compact-sst-created-in-seconds", dont_compact_sst_created_in_seconds_);
if (dont_compact_sst_created_in_seconds_ < 300) {
dont_compact_sst_created_in_seconds_ = 300;
}
SetConfInt("best-delete-min-ratio", best_delete_min_ratio_);
if (best_delete_min_ratio_ < 10) {
best_delete_min_ratio_ = 10;
}

std::string cs_;
SetConfStr("compaction-strategy", cs_);
if (cs_ == "full_compact") {
compaction_strategy_ = FullCompact;
} else if (cs_ == "obd_compact") {
compaction_strategy_ = OldestOrBestDeleteRatioSstCompact;
} else {
compaction_strategy_ = FullCompact;
}

SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("cache-type", scachetype);
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
Expand Down
8 changes: 8 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ void DB::CompactRange(const storage::DataType& type, const std::string& start, c
storage_->CompactRange(type, start, end);
}

void DB::LongestNotCompactiontSstCompact(const storage::DataType& type) {
std::lock_guard rwl(dbs_rw_);
if (!opened_) {
return;
}
storage_->LongestNotCompactiontSstCompact(type);
}

void DB::DoKeyScan(void* arg) {
std::unique_ptr <BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));
bg_task_arg->db->RunKeyScan();
Expand Down
22 changes: 22 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ Status PikaServer::DoSameThingEveryDB(const TaskType& type) {
case TaskType::kCompactAll:
db_item.second->Compact(storage::DataType::kAll);
break;
case TaskType::kCompactOldestOrBestDeleteRatioSst:
db_item.second->LongestNotCompactiontSstCompact(storage::DataType::kAll);
break;
default:
break;
}
Expand Down Expand Up @@ -1220,6 +1223,12 @@ void PikaServer::AutoCompactRange() {
}
}
}

if (g_pika_conf->compaction_strategy() == PikaConf::FullCompact) {
DoSameThingEveryDB(TaskType::kCompactAll);
} else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) {
DoSameThingEveryDB(TaskType::kCompactOldestOrBestDeleteRatioSst);
}
}

void PikaServer::AutoBinlogPurge() { DoSameThingEveryDB(TaskType::kPurgeLog); }
Expand Down Expand Up @@ -1430,6 +1439,12 @@ void PikaServer::InitStorageOptions() {
storage_options_.options.max_bytes_for_level_base = g_pika_conf->level0_file_num_compaction_trigger() * g_pika_conf->write_buffer_size();
storage_options_.options.max_subcompactions = g_pika_conf->max_subcompactions();
storage_options_.options.target_file_size_base = g_pika_conf->target_file_size_base();
storage_options_.options.level0_file_num_compaction_trigger = g_pika_conf->level0_file_num_compaction_trigger();
storage_options_.options.level0_stop_writes_trigger = g_pika_conf->level0_stop_writes_trigger();
storage_options_.options.level0_slowdown_writes_trigger = g_pika_conf->level0_slowdown_writes_trigger();
storage_options_.options.min_write_buffer_number_to_merge = g_pika_conf->min_write_buffer_number_to_merge();
storage_options_.options.max_bytes_for_level_base = g_pika_conf->level0_file_num_compaction_trigger() * g_pika_conf->write_buffer_size();
storage_options_.options.max_subcompactions = g_pika_conf->max_subcompactions();
storage_options_.options.max_compaction_bytes = g_pika_conf->max_compaction_bytes();
storage_options_.options.max_background_flushes = g_pika_conf->max_background_flushes();
storage_options_.options.max_background_compactions = g_pika_conf->max_background_compactions();
Expand Down Expand Up @@ -1483,6 +1498,13 @@ void PikaServer::InitStorageOptions() {
storage_options_.statistics_max_size = g_pika_conf->max_cache_statistic_keys();
storage_options_.small_compaction_threshold = g_pika_conf->small_compaction_threshold();

// For Storage compaction
storage_options_.compact_param_.best_delete_min_ratio_ = g_pika_conf->best_delete_min_ratio();
storage_options_.compact_param_.dont_compact_sst_created_in_seconds_ = g_pika_conf->dont_compact_sst_created_in_seconds();
storage_options_.compact_param_.force_compact_file_age_seconds_ = g_pika_conf->force_compact_file_age_seconds();
storage_options_.compact_param_.force_compact_min_delete_ratio_ = g_pika_conf->force_compact_min_delete_ratio();
storage_options_.compact_param_.compact_every_num_of_files_ = g_pika_conf->compact_every_num_of_files();

// rocksdb blob
if (g_pika_conf->enable_blob_files()) {
storage_options_.options.enable_blob_files = g_pika_conf->enable_blob_files();
Expand Down
22 changes: 21 additions & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ struct StorageOptions {
bool enable_db_statistics = false;
size_t small_compaction_threshold = 5000;
size_t small_compaction_duration_threshold = 10000;
struct CompactParam {
// for LongestNotCompactiontSstCompact function
int compact_every_num_of_files_;
int force_compact_file_age_seconds_;
int force_compact_min_delete_ratio_;
int dont_compact_sst_created_in_seconds_;
int best_delete_min_ratio_;
};
CompactParam compact_param_;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
};

Expand Down Expand Up @@ -156,7 +165,8 @@ enum BitOpType { kBitOpAnd = 1, kBitOpOr, kBitOpXor, kBitOpNot, kBitOpDefault };
enum Operation {
kNone = 0,
kCleanAll,
kCompactRange
kCompactRange,
kCompactOldestOrBestDeleteRatioSst,
};

struct BGTask {
Expand Down Expand Up @@ -1080,6 +1090,14 @@ class Storage {
Status DoCompactRange(const DataType& type, const std::string& start, const std::string& end);
Status DoCompactSpecificKey(const DataType& type, const std::string& key);

/**
* LongestNotCompactiontSstCompact will execute the compact command for any cf in the given type
* @param type. data type like `kStrings`
* @param sync. if true, block function
* @return Status
*/
Status LongestNotCompactiontSstCompact(const DataType &type, bool sync = false);

Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);
Expand All @@ -1103,6 +1121,7 @@ class Storage {
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);

const StorageOptions& GetStorageOptions();
// get hash cf handle in insts_[idx]
std::vector<rocksdb::ColumnFamilyHandle*> GetHashCFHandles(const int idx);
// get DefaultWriteOptions in insts_[idx]
Expand All @@ -1115,6 +1134,7 @@ class Storage {
int db_instance_num_ = 3;
int slot_num_ = 1024;
bool is_classic_mode_ = true;
StorageOptions storage_options_;

std::unique_ptr<LRUCache<std::string, std::string>> cursors_store_;

Expand Down
Loading

0 comments on commit af1994c

Please sign in to comment.