Skip to content

Commit

Permalink
feat: Statistics ticker count (OpenAtomFoundation#2769)
Browse files Browse the repository at this point in the history
* add statistics ticker count
  • Loading branch information
baixin01 authored Aug 9, 2024
1 parent defcca0 commit 269b420
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 32 deletions.
6 changes: 6 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ level0-slowdown-writes-trigger : 20
# rocksdb level0_file_num_compaction_trigger
level0-file-num-compaction-trigger : 4

# enable db statistics [yes | no] default no
enable-db-statistics : no
# see rocksdb/include/rocksdb/statistics.h enum StatsLevel for more details
# only use ticker counter should set db-statistics-level to 2
db-statistics-level : 2

# The maximum size of the response package to client to prevent memory
# exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response.
# Supported Units [K|M|G]. The default unit is in [bytes].
Expand Down
9 changes: 9 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_total_wal_size_;
}
bool enable_db_statistics() {
return enable_db_statistics_;
}
int db_statistics_level() {
std::shared_lock l(rwlock_);
return db_statistics_level_;
}
int64_t max_client_response_size() {
std::shared_lock l(rwlock_);
return max_client_response_size_;
Expand Down Expand Up @@ -929,6 +936,8 @@ class PikaConf : public pstd::BaseConf {
int64_t thread_migrate_keys_num_ = 0;
int64_t max_write_buffer_size_ = 0;
int64_t max_total_wal_size_ = 0;
bool enable_db_statistics_ = false;
int db_statistics_level_ = 0;
int max_write_buffer_num_ = 0;
int min_write_buffer_number_to_merge_ = 1;
int level0_stop_writes_trigger_ = 36;
Expand Down
12 changes: 12 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,18 @@ void ConfigCmd::ConfigGet(std::string& ret) {
: EncodeString(&config_body, "resetchannels");
}

if (pstd::stringmatch(pattern.data(), "enable-db-statistics", 1)) {
elements += 2;
EncodeString(&config_body, "enable-db-statistics");
EncodeString(&config_body, g_pika_conf->enable_db_statistics() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "db-statistics-level", 1)) {
elements += 2;
EncodeString(&config_body, "db-statistics-level");
EncodeNumber(&config_body, g_pika_conf->db_statistics_level());
}

std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
ret = resp.str();
Expand Down
13 changes: 13 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,17 @@ int PikaConf::Load() {
max_rsync_parallel_num_ = kMaxRsyncParallelNum;
}

// rocksdb_statistics_tickers
std::string open_tickers;
GetConfStr("enable-db-statistics", &open_tickers);
enable_db_statistics_ = open_tickers == "yes";

db_statistics_level_ = 0;
GetConfInt("db-statistics-level", &db_statistics_level_);
if (db_statistics_level_ < 0) {
db_statistics_level_ = 0;
}

int64_t tmp_rsync_timeout_ms = -1;
GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms);
if(tmp_rsync_timeout_ms <= 0){
Expand Down Expand Up @@ -816,6 +827,8 @@ int PikaConf::ConfigRewrite() {
SetConfStr("slotmigrate", slotmigrate_.load() ? "yes" : "no");
SetConfInt64("slotmigrate-thread-num", slotmigrate_thread_num_);
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
SetConfStr("enable-db-statistics", enable_db_statistics_ ? "yes" : "no");
SetConfInt("db-statistics-level", db_statistics_level_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
Expand Down
3 changes: 3 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,9 @@ void PikaServer::InitStorageOptions() {
storage_options_.table_options.pin_top_level_index_and_filter = true;
storage_options_.table_options.optimize_filters_for_memory = true;
}
// For statistics
storage_options_.enable_db_statistics = g_pika_conf->enable_db_statistics();
storage_options_.db_statistics_level = g_pika_conf->db_statistics_level();
}

storage::Status PikaServer::RewriteStorageOptions(const storage::OptionType& option_type,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct StorageOptions {
size_t block_cache_size = 0;
bool share_block_cache = false;
size_t statistics_max_size = 0;
int db_statistics_level = 0;
bool enable_db_statistics = false;
size_t small_compaction_threshold = 5000;
size_t small_compaction_duration_threshold = 10000;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
Expand Down
191 changes: 159 additions & 32 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_

rocksdb::DBOptions db_ops(storage_options.options);
db_ops.create_missing_column_families = true;
// db_ops.env = env_;
if (storage_options.enable_db_statistics) {
db_statistics_ = rocksdb::CreateDBStatistics();
db_statistics_->set_stats_level(static_cast<rocksdb::StatsLevel>(storage_options.db_statistics_level));
db_ops.statistics = db_statistics_;
}

/*
* Because zset, set, the hash, list, stream type meta
Expand Down Expand Up @@ -270,10 +274,32 @@ void Redis::GetRocksDBInfo(std::string& info, const char* prefix) {
std::ostringstream string_stream;
string_stream << "#" << prefix << "RocksDB" << "\r\n";

auto write_stream_key_value=[&](const Slice& property, const char *metric) {
uint64_t value;
db_->GetAggregatedIntProperty(property, &value);
string_stream << prefix << metric << ':' << value << "\r\n";
auto write_aggregated_int_property=[&](const Slice& property, const char *metric) {
uint64_t value = 0;
db_->GetAggregatedIntProperty(property, &value);
string_stream << prefix << metric << ':' << value << "\r\n";
};

auto write_property=[&](const Slice& property, const char *metric) {
if (handles_.size() == 0) {
std::string value;
db_->GetProperty(db_->DefaultColumnFamily(), property, &value);
string_stream << prefix << metric << "_" << db_->DefaultColumnFamily()->GetName() << ':' << value << "\r\n";
} else {
for (auto handle : handles_) {
std::string value;
db_->GetProperty(handle, property, &value);
string_stream << prefix << metric << "_" << handle->GetName() << ':' << value << "\r\n";
}
}
};

auto write_ticker_count = [&](uint32_t tick_type, const char *metric) {
if (db_statistics_ == nullptr) {
return;
}
uint64_t count = db_statistics_->getTickerCount(tick_type);
string_stream << prefix << metric << ':' << count << "\r\n";
};

auto mapToString=[&](const std::map<std::string, std::string>& map_data, const char *prefix) {
Expand All @@ -285,57 +311,158 @@ void Redis::GetRocksDBInfo(std::string& info, const char* prefix) {
};

// memtables num
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTable, "num_immutable_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTableFlushed, "num_immutable_mem_table_flushed");
write_stream_key_value(rocksdb::DB::Properties::kMemTableFlushPending, "mem_table_flush_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningFlushes, "num_running_flushes");
write_aggregated_int_property(rocksdb::DB::Properties::kNumImmutableMemTable, "num_immutable_mem_table");
write_aggregated_int_property(rocksdb::DB::Properties::kNumImmutableMemTableFlushed, "num_immutable_mem_table_flushed");
write_aggregated_int_property(rocksdb::DB::Properties::kMemTableFlushPending, "mem_table_flush_pending");
write_aggregated_int_property(rocksdb::DB::Properties::kNumRunningFlushes, "num_running_flushes");

// compaction
write_stream_key_value(rocksdb::DB::Properties::kCompactionPending, "compaction_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningCompactions, "num_running_compactions");
write_aggregated_int_property(rocksdb::DB::Properties::kCompactionPending, "compaction_pending");
write_aggregated_int_property(rocksdb::DB::Properties::kNumRunningCompactions, "num_running_compactions");

// background errors
write_stream_key_value(rocksdb::DB::Properties::kBackgroundErrors, "background_errors");
write_aggregated_int_property(rocksdb::DB::Properties::kBackgroundErrors, "background_errors");

// memtables size
write_stream_key_value(rocksdb::DB::Properties::kCurSizeActiveMemTable, "cur_size_active_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kCurSizeAllMemTables, "cur_size_all_mem_tables");
write_stream_key_value(rocksdb::DB::Properties::kSizeAllMemTables, "size_all_mem_tables");
write_aggregated_int_property(rocksdb::DB::Properties::kCurSizeActiveMemTable, "cur_size_active_mem_table");
write_aggregated_int_property(rocksdb::DB::Properties::kCurSizeAllMemTables, "cur_size_all_mem_tables");
write_aggregated_int_property(rocksdb::DB::Properties::kSizeAllMemTables, "size_all_mem_tables");

// keys
write_stream_key_value(rocksdb::DB::Properties::kEstimateNumKeys, "estimate_num_keys");
write_aggregated_int_property(rocksdb::DB::Properties::kEstimateNumKeys, "estimate_num_keys");

// table readers mem
write_stream_key_value(rocksdb::DB::Properties::kEstimateTableReadersMem, "estimate_table_readers_mem");
write_aggregated_int_property(rocksdb::DB::Properties::kEstimateTableReadersMem, "estimate_table_readers_mem");

// snapshot
write_stream_key_value(rocksdb::DB::Properties::kNumSnapshots, "num_snapshots");
write_aggregated_int_property(rocksdb::DB::Properties::kNumSnapshots, "num_snapshots");

// version
write_stream_key_value(rocksdb::DB::Properties::kNumLiveVersions, "num_live_versions");
write_stream_key_value(rocksdb::DB::Properties::kCurrentSuperVersionNumber, "current_super_version_number");
write_aggregated_int_property(rocksdb::DB::Properties::kNumLiveVersions, "num_live_versions");
write_aggregated_int_property(rocksdb::DB::Properties::kCurrentSuperVersionNumber, "current_super_version_number");

// live data size
write_stream_key_value(rocksdb::DB::Properties::kEstimateLiveDataSize, "estimate_live_data_size");
write_aggregated_int_property(rocksdb::DB::Properties::kEstimateLiveDataSize, "estimate_live_data_size");

// sst files
write_stream_key_value(rocksdb::DB::Properties::kTotalSstFilesSize, "total_sst_files_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveSstFilesSize, "live_sst_files_size");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"0", "num_files_at_level0");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"1", "num_files_at_level1");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"2", "num_files_at_level2");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"3", "num_files_at_level3");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"4", "num_files_at_level4");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"5", "num_files_at_level5");
write_property(rocksdb::DB::Properties::kNumFilesAtLevelPrefix+"6", "num_files_at_level6");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"0", "compression_ratio_at_level0");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"1", "compression_ratio_at_level1");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"2", "compression_ratio_at_level2");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"3", "compression_ratio_at_level3");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"4", "compression_ratio_at_level4");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"5", "compression_ratio_at_level5");
write_property(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix+"6", "compression_ratio_at_level6");
write_aggregated_int_property(rocksdb::DB::Properties::kTotalSstFilesSize, "total_sst_files_size");
write_aggregated_int_property(rocksdb::DB::Properties::kLiveSstFilesSize, "live_sst_files_size");

// pending compaction bytes
write_stream_key_value(rocksdb::DB::Properties::kEstimatePendingCompactionBytes, "estimate_pending_compaction_bytes");
write_aggregated_int_property(rocksdb::DB::Properties::kEstimatePendingCompactionBytes, "estimate_pending_compaction_bytes");

// block cache
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheCapacity, "block_cache_capacity");
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheUsage, "block_cache_usage");
write_stream_key_value(rocksdb::DB::Properties::kBlockCachePinnedUsage, "block_cache_pinned_usage");
write_aggregated_int_property(rocksdb::DB::Properties::kBlockCacheCapacity, "block_cache_capacity");
write_aggregated_int_property(rocksdb::DB::Properties::kBlockCacheUsage, "block_cache_usage");
write_aggregated_int_property(rocksdb::DB::Properties::kBlockCachePinnedUsage, "block_cache_pinned_usage");

// blob files
write_stream_key_value(rocksdb::DB::Properties::kNumBlobFiles, "num_blob_files");
write_stream_key_value(rocksdb::DB::Properties::kBlobStats, "blob_stats");
write_stream_key_value(rocksdb::DB::Properties::kTotalBlobFileSize, "total_blob_file_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveBlobFileSize, "live_blob_file_size");

write_aggregated_int_property(rocksdb::DB::Properties::kNumBlobFiles, "num_blob_files");
write_aggregated_int_property(rocksdb::DB::Properties::kBlobStats, "blob_stats");
write_aggregated_int_property(rocksdb::DB::Properties::kTotalBlobFileSize, "total_blob_file_size");
write_aggregated_int_property(rocksdb::DB::Properties::kLiveBlobFileSize, "live_blob_file_size");

write_aggregated_int_property(rocksdb::DB::Properties::kBlobCacheCapacity, "blob_cache_capacity");
write_aggregated_int_property(rocksdb::DB::Properties::kBlobCacheUsage, "blob_cache_usage");
write_aggregated_int_property(rocksdb::DB::Properties::kBlobCachePinnedUsage, "blob_cache_pinned_usage");

//rocksdb ticker
{
// memtables num
write_ticker_count(rocksdb::Tickers::MEMTABLE_HIT, "memtable_hit");
write_ticker_count(rocksdb::Tickers::MEMTABLE_MISS, "memtable_miss");

write_ticker_count(rocksdb::Tickers::BYTES_WRITTEN, "bytes_written");
write_ticker_count(rocksdb::Tickers::BYTES_READ, "bytes_read");
write_ticker_count(rocksdb::Tickers::ITER_BYTES_READ, "iter_bytes_read");
write_ticker_count(rocksdb::Tickers::GET_HIT_L0, "get_hit_l0");
write_ticker_count(rocksdb::Tickers::GET_HIT_L1, "get_hit_l1");
write_ticker_count(rocksdb::Tickers::GET_HIT_L2_AND_UP, "get_hit_l2_and_up");

write_ticker_count(rocksdb::Tickers::BLOOM_FILTER_USEFUL, "bloom_filter_useful");
write_ticker_count(rocksdb::Tickers::BLOOM_FILTER_FULL_POSITIVE, "bloom_filter_full_positive");
write_ticker_count(rocksdb::Tickers::BLOOM_FILTER_FULL_TRUE_POSITIVE, "bloom_filter_full_true_positive");
write_ticker_count(rocksdb::Tickers::BLOOM_FILTER_PREFIX_CHECKED, "bloom_filter_prefix_checked");
write_ticker_count(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL, "bloom_filter_prefix_useful");

// compaction
write_ticker_count(rocksdb::Tickers::COMPACTION_KEY_DROP_NEWER_ENTRY, "compaction_key_drop_newer_entry");
write_ticker_count(rocksdb::Tickers::COMPACTION_KEY_DROP_OBSOLETE, "compaction_key_drop_obsolete");
write_ticker_count(rocksdb::Tickers::COMPACTION_KEY_DROP_USER, "compaction_key_drop_user");
write_ticker_count(rocksdb::Tickers::COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, "compaction_optimized_del_drop_obsolete");
write_ticker_count(rocksdb::Tickers::COMPACT_READ_BYTES, "compact_read_bytes");
write_ticker_count(rocksdb::Tickers::COMPACT_WRITE_BYTES, "compact_write_bytes");
write_ticker_count(rocksdb::Tickers::FLUSH_WRITE_BYTES, "flush_write_bytes");

// keys
write_ticker_count(rocksdb::Tickers::NUMBER_KEYS_READ, "number_keys_read");
write_ticker_count(rocksdb::Tickers::NUMBER_KEYS_WRITTEN, "number_keys_written");
write_ticker_count(rocksdb::Tickers::NUMBER_KEYS_UPDATED, "number_keys_updated");
write_ticker_count(rocksdb::Tickers::NUMBER_OF_RESEEKS_IN_ITERATION, "number_of_reseeks_in_iteration");

write_ticker_count(rocksdb::Tickers::NUMBER_DB_SEEK, "number_db_seek");
write_ticker_count(rocksdb::Tickers::NUMBER_DB_NEXT, "number_db_next");
write_ticker_count(rocksdb::Tickers::NUMBER_DB_PREV, "number_db_prev");
write_ticker_count(rocksdb::Tickers::NUMBER_DB_SEEK_FOUND, "number_db_seek_found");
write_ticker_count(rocksdb::Tickers::NUMBER_DB_NEXT_FOUND, "number_db_next_found");
write_ticker_count(rocksdb::Tickers::NUMBER_DB_PREV_FOUND, "number_db_prev_found");
write_ticker_count(rocksdb::Tickers::LAST_LEVEL_READ_BYTES, "last_level_read_bytes");
write_ticker_count(rocksdb::Tickers::LAST_LEVEL_READ_COUNT, "last_level_read_count");
write_ticker_count(rocksdb::Tickers::NON_LAST_LEVEL_READ_BYTES, "non_last_level_read_bytes");
write_ticker_count(rocksdb::Tickers::NON_LAST_LEVEL_READ_COUNT, "non_last_level_read_count");

// background errors
write_ticker_count(rocksdb::Tickers::STALL_MICROS, "stall_micros");

// sst files
write_ticker_count(rocksdb::Tickers::NO_FILE_OPENS, "no_file_opens");
write_ticker_count(rocksdb::Tickers::NO_FILE_ERRORS, "no_file_errors");

// block cache
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT, "block_cache_index_hit");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS, "block_cache_index_miss");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT, "block_cache_filter_hit");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS, "block_cache_filter_miss");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_DATA_HIT, "block_cache_data_hit");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_DATA_MISS, "block_cache_data_miss");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_BYTES_READ, "block_cache_bytes_read");
write_ticker_count(rocksdb::Tickers::BLOCK_CACHE_BYTES_WRITE, "block_cache_bytes_write");

// blob files
write_ticker_count(rocksdb::Tickers::BLOB_DB_NUM_KEYS_WRITTEN, "blob_db_num_keys_written");
write_ticker_count(rocksdb::Tickers::BLOB_DB_NUM_KEYS_READ, "blob_db_num_keys_read");
write_ticker_count(rocksdb::Tickers::BLOB_DB_BYTES_WRITTEN, "blob_db_bytes_written");
write_ticker_count(rocksdb::Tickers::BLOB_DB_BYTES_READ, "blob_db_bytes_read");
write_ticker_count(rocksdb::Tickers::BLOB_DB_NUM_SEEK, "blob_db_num_seek");
write_ticker_count(rocksdb::Tickers::BLOB_DB_NUM_NEXT, "blob_db_num_next");
write_ticker_count(rocksdb::Tickers::BLOB_DB_NUM_PREV, "blob_db_num_prev");
write_ticker_count(rocksdb::Tickers::BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "blob_db_blob_file_bytes_written");
write_ticker_count(rocksdb::Tickers::BLOB_DB_BLOB_FILE_BYTES_READ, "blob_db_blob_file_bytes_read");

write_ticker_count(rocksdb::Tickers::BLOB_DB_GC_NUM_FILES, "blob_db_gc_num_files");
write_ticker_count(rocksdb::Tickers::BLOB_DB_GC_NUM_NEW_FILES, "blob_db_gc_num_new_files");
write_ticker_count(rocksdb::Tickers::BLOB_DB_GC_NUM_KEYS_RELOCATED, "blob_db_gc_num_keys_relocated");
write_ticker_count(rocksdb::Tickers::BLOB_DB_GC_BYTES_RELOCATED, "blob_db_gc_bytes_relocated");

write_ticker_count(rocksdb::Tickers::BLOB_DB_CACHE_MISS, "blob_db_cache_miss");
write_ticker_count(rocksdb::Tickers::BLOB_DB_CACHE_HIT, "blob_db_cache_hit");
write_ticker_count(rocksdb::Tickers::BLOB_DB_CACHE_BYTES_READ, "blob_db_cache_bytes_read");
write_ticker_count(rocksdb::Tickers::BLOB_DB_CACHE_BYTES_WRITE, "blob_db_cache_bytes_write");
}
// column family stats
std::map<std::string, std::string> mapvalues;
db_->rocksdb::DB::GetMapProperty(rocksdb::DB::Properties::kCFStats,&mapvalues);
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ class Redis {
Storage* const storage_;
std::shared_ptr<LockMgr> lock_mgr_;
rocksdb::DB* db_ = nullptr;
std::shared_ptr<rocksdb::Statistics> db_statistics_ = nullptr;
//TODO(wangshaoyi): seperate env for each rocksdb instance
// rocksdb::Env* env_ = nullptr;

Expand Down

0 comments on commit 269b420

Please sign in to comment.