Skip to content

Commit

Permalink
fix:bitmap cache (OpenAtomFoundation#2253)
Browse files Browse the repository at this point in the history
* fix:bitmap cache

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin authored Jan 24, 2024
1 parent 970a81e commit c7373ba
Show file tree
Hide file tree
Showing 14 changed files with 119 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:
- name: Install Deps
run: |
brew update
brew install --overwrite python autoconf protobuf llvm wget git
brew install --overwrite autoconf protobuf llvm wget git
brew install gcc@10 automake cmake make binutils
- name: Configure CMake
Expand Down
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash
cache-type: string, set, zset, list, hash, bit

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down
7 changes: 6 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ class PikaConf : public pstd::BaseConf {
acl_Log_max_len_ = value;
}

const std::string scache_type() {
std::lock_guard l(rwlock_);
return pstd::StringConcat(cache_type_, COMMA);
}

int64_t cache_maxmemory() { return cache_maxmemory_; }
void SetSlowCmd(const std::string& value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -686,7 +691,7 @@ class PikaConf : public pstd::BaseConf {
std::string masterauth_;
std::atomic<bool> classic_mode_;
int databases_ = 0;
int default_slot_num_ = 0;
int default_slot_num_ = 1;
std::vector<DBStruct> db_structs_;
std::string default_db_;
std::string bgsave_path_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,12 @@ const std::string kDBSyncModule = "document";

const std::string kBgsaveInfoFile = "info";

// prefix of pika cache
const std::string PCacheKeyPrefixK = "K";
const std::string PCacheKeyPrefixH = "H";
const std::string PCacheKeyPrefixS = "S";
const std::string PCacheKeyPrefixZ = "Z";
const std::string PCacheKeyPrefixL = "L";
const std::string PCacheKeyPrefixB = "B";


/*
Expand Down
1 change: 0 additions & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ class LPushxCmd : public Cmd {

private:
std::string key_;
std::string value_;
rocksdb::Status s_;
std::vector<std::string> values_;
void DoInitial() override;
Expand Down
68 changes: 64 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,61 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->replication_id());
}


if (pstd::stringmatch(pattern.data(), "cache-num", 1)) {
elements += 2;
EncodeString(&config_body, "cache-num");
EncodeNumber(&config_body, g_pika_conf->GetCacheNum());
}

if (pstd::stringmatch(pattern.data(), "cache-model", 1)) {
elements += 2;
EncodeString(&config_body, "cache-model");
EncodeNumber(&config_body, g_pika_conf->cache_model());
}

if (pstd::stringmatch(pattern.data(), "cache-type", 1)) {
elements += 2;
EncodeString(&config_body, "cache-type");
EncodeString(&config_body, g_pika_conf->scache_type());
}

if (pstd::stringmatch(pattern.data(), "zset-cache-start-direction", 1)) {
elements += 2;
EncodeString(&config_body, "zset-cache-start-direction");
EncodeNumber(&config_body, g_pika_conf->zset_cache_start_pos());
}

if (pstd::stringmatch(pattern.data(), "zset-cache-field-num-per-key", 1)) {
elements += 2;
EncodeString(&config_body, "zset-cache-field-num-per-key");
EncodeNumber(&config_body, g_pika_conf->zset_cache_field_num_per_key());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory-policy", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory-policy");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory_policy());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory-samples", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory-samples");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory_samples());
}

if (pstd::stringmatch(pattern.data(), "cache-lfu-decay-time", 1)) {
elements += 2;
EncodeString(&config_body, "cache-lfu-decay-time");
EncodeNumber(&config_body, g_pika_conf->cache_lfu_decay_time());
}

if (pstd::stringmatch(pattern.data(), "acl-pubsub-default", 1) != 0) {
elements += 2;
EncodeString(&config_body, "acl-pubsub-default");
Expand Down Expand Up @@ -2121,6 +2176,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"arena-block-size",
"throttle-bytes-per-second",
"max-rsync-parallel-num",
"cache-model",
"cache-type",
"zset-cache-start-direction",
"zset-cache-field-num-per-key",
"cache-lfu-decay-time",
});
res_.AppendStringVector(replyVt);
return;
Expand Down Expand Up @@ -3070,17 +3130,17 @@ void CacheCmd::DoInitial() {
return;
}
if (!strcasecmp(argv_[1].data(), "clear")) {
if (!strcasecmp(argv_[2].data(), "db")) {
if (argv_.size() == 3 && !strcasecmp(argv_[2].data(), "db")) {
condition_ = kCLEAR_DB;
} else if (!strcasecmp(argv_[2].data(), "hitratio")) {
} else if (argv_.size() == 3 && !strcasecmp(argv_[2].data(), "hitratio")) {
condition_ = kCLEAR_HITRATIO;
} else {
res_.SetRes(CmdRes::kErrOther, "Unknown cache subcommand or wrong # of args.");
}
} else if (!strcasecmp(argv_[1].data(), "del")) {
} else if (argv_.size() >= 3 && !strcasecmp(argv_[1].data(), "del")) {
condition_ = kDEL_KEYS;
keys_.assign(argv_.begin() + 2, argv_.end());
} else if (!strcasecmp(argv_[1].data(), "randomkey")) {
} else if (argv_.size() == 2 && !strcasecmp(argv_[1].data(), "randomkey")) {
condition_ = kRANDOM_KEY;
} else {
res_.SetRes(CmdRes::kErrOther, "Unknown cache subcommand or wrong # of args.");
Expand Down
24 changes: 12 additions & 12 deletions src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ void BitSetCmd::DoThroughDB() {

void BitSetCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
db_->cache()->SetBitIfKeyExist(CachePrefixKeyB, bit_offset_, on_);
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
db_->cache()->SetBitIfKeyExist(CachePrefixKeyK, bit_offset_, on_);
}
}

Expand Down Expand Up @@ -94,8 +94,8 @@ void BitGetCmd::Do() {

void BitGetCmd::ReadCache() {
int64_t bit_val = 0;
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
auto s = db_->cache()->GetBit(CachePrefixKeyB, bit_offset_, &bit_val);
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
auto s = db_->cache()->GetBit(CachePrefixKeyK, bit_offset_, &bit_val);
if (s.ok()) {
res_.AppendInteger(bit_val);
} else if (s.IsNotFound()) {
Expand Down Expand Up @@ -159,11 +159,11 @@ void BitCountCmd::ReadCache() {
int64_t start = static_cast<long>(start_offset_);
int64_t end = static_cast<long>(end_offset_);
rocksdb::Status s;
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
if (count_all_) {
s = db_->cache()->BitCount(CachePrefixKeyB, start, end, &count, 0);
s = db_->cache()->BitCount(CachePrefixKeyK, start, end, &count, 0);
} else {
s = db_->cache()->BitCount(CachePrefixKeyB, start, end, &count, 1);
s = db_->cache()->BitCount(CachePrefixKeyK, start, end, &count, 1);
}

if (s.ok()) {
Expand Down Expand Up @@ -249,13 +249,13 @@ void BitPosCmd::ReadCache() {
int64_t bit = static_cast<long>(bit_val_);
int64_t start = static_cast<long>(start_offset_);
int64_t end = static_cast<long>(end_offset_);\
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
if (pos_all_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, &pos);
} else if (!pos_all_ && !endoffset_set_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, start, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, start, &pos);
} else if (!pos_all_ && endoffset_set_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, start, end, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, start, end, &pos);
}
if (s.ok()) {
res_.AppendInteger(pos);
Expand Down Expand Up @@ -329,7 +329,7 @@ void BitOpCmd::DoThroughDB() {
void BitOpCmd::DoUpdateCache() {
if (s_.ok()) {
std::vector<std::string> v;
v.emplace_back(PCacheKeyPrefixB + dest_key_);
v.emplace_back(PCacheKeyPrefixK + dest_key_);
db_->cache()->Del(v);
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow);
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
Expand Down Expand Up @@ -441,7 +441,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHStrlen, std::move(hstrlenptr)));
////HValsCmd
std::unique_ptr<Cmd> hvalsptr =
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow);
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHVals, std::move(hvalsptr)));
////HScanCmd
std::unique_ptr<Cmd> hscanptr = std::make_unique<HScanCmd>(
Expand Down Expand Up @@ -672,29 +672,29 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSMove, std::move(smoveptr)));
////SRandmemberCmd
std::unique_ptr<Cmd> srandmemberptr =
std::make_unique<SRandmemberCmd>(kCmdNameSRandmember, -2, kCmdFlagsRead | kCmdFlagsSet|kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache | kCmdFlagsSlow);
std::make_unique<SRandmemberCmd>(kCmdNameSRandmember, -2, kCmdFlagsRead | kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSRandmember, std::move(srandmemberptr)));

// BitMap
////bitsetCmd
std::unique_ptr<Cmd> bitsetptr =
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitSet, std::move(bitsetptr)));
////bitgetCmd
std::unique_ptr<Cmd> bitgetptr =
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitGet, std::move(bitgetptr)));
////bitcountCmd
std::unique_ptr<Cmd> bitcountptr =
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitCount, std::move(bitcountptr)));
////bitposCmd
std::unique_ptr<Cmd> bitposptr =
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitPos, std::move(bitposptr)));
////bitopCmd
std::unique_ptr<Cmd> bitopptr =
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitOp, std::move(bitopptr)));

// HyperLogLog
Expand Down
15 changes: 10 additions & 5 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ int PikaConf::Load() {
}

GetConfInt("default-slot-num", &default_slot_num_);
if (default_slot_num_ <= 0) {
LOG(FATAL) << "config default-slot-num error,"
<< " it should greater than zero, the actual is: " << default_slot_num_;
}
GetConfStr("dump-path", &bgsave_path_);
bgsave_path_ = bgsave_path_.empty() ? "./dump/" : bgsave_path_;
if (bgsave_path_[bgsave_path_.length() - 1] != '/') {
Expand Down Expand Up @@ -621,7 +617,7 @@ void PikaConf::SetCacheType(const std::string& value) {

int PikaConf::ConfigRewrite() {
// std::string userblacklist = suser_blacklist();

std::string scachetype = scache_type();
std::lock_guard l(rwlock_);
// Only set value for config item that can be config set.
SetConfInt("timeout", timeout_);
Expand Down Expand Up @@ -649,6 +645,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("cache-type", scachetype);
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
SetConfInt64("manually-resume-interval", resume_check_interval_);
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
Expand All @@ -669,6 +666,14 @@ int PikaConf::ConfigRewrite() {
SetConfInt64("slotmigrate", slotmigrate_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
SetConfStr("share-block-cache", share_block_cache_ ? "yes" : "no");
SetConfInt("block-size", block_size_);
SetConfInt("block-cache", block_cache_);
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
SetConfInt("cache-model", cache_model_);
SetConfInt("zset_cache_start_pos", zset_cache_start_pos_);
SetConfInt("zset_cache_field_num_per_key", zset_cache_field_num_per_key_);

if (!diff_commands_.empty()) {
std::vector<pstd::BaseConf::Rep::ConfItem> filtered_items;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ void PsetexCmd::DoThroughDB() {
void PsetexCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
db_->cache()->WriteKVToCache(CachePrefixKeyK, value_, static_cast<int32_t>(usec_ / 1000));
db_->cache()->Setxx(CachePrefixKeyK, value_, static_cast<int32_t>(usec_ / 1000));
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,7 @@ void LPushxCmd::DoThroughDB() {
void LPushxCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyL = PCacheKeyPrefixL + key_;
std::vector<std::string> values;
values.push_back(value_);
db_->cache()->LPushx(CachePrefixKeyL, values);
db_->cache()->LPushx(CachePrefixKeyL, values_);
}
}

Expand Down Expand Up @@ -898,8 +896,6 @@ void RPushxCmd::DoThroughDB() {
void RPushxCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyL = PCacheKeyPrefixL + key_;
std::vector<std::string> values;
values.push_back(value_);
db_->cache()->RPushx(CachePrefixKeyL, values);
db_->cache()->RPushx(CachePrefixKeyL, values_);
}
}
9 changes: 5 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1742,6 +1742,8 @@ void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cac
}

void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
// disable cache temporarily, and restore it after cache cleared
g_pika_conf->SetCacheDisableFlag();
if (PIKA_CACHE_STATUS_OK != db->cache()->CacheStatus()) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
Expand Down Expand Up @@ -1784,8 +1786,9 @@ void PikaServer::DoCacheBGTask(void* arg) {
LOG(WARNING) << "invalid cache task type: " << pCacheTaskArg->task_type;
break;
}

db->cache()->SetCacheStatus(PIKA_CACHE_STATUS_OK);
if (pCacheTaskArg->reenable_cache && pCacheTaskArg->conf) {
if (pCacheTaskArg->reenable_cache) {
pCacheTaskArg->conf->UnsetCacheDisableFlag();
}
}
Expand All @@ -1806,10 +1809,8 @@ void PikaServer::ClearHitRatio(std::shared_ptr<DB> db) {
}

void PikaServer::OnCacheStartPosChanged(int zset_cache_start_pos, std::shared_ptr<DB> db) {
// disable cache temporarily, and restore it after cache cleared
g_pika_conf->SetCacheDisableFlag();
ResetCacheConfig(db);
ClearCacheDbAsyncV2(db);
ClearCacheDbAsync(db);
}

void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pika_integration

import (
"context"
"sort"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -305,6 +306,7 @@ var _ = Describe("Hash Commands", func() {
var slice []string
err = client.HVals(ctx, "hash121").ScanSlice(&slice)
Expect(err).NotTo(HaveOccurred())
sort.Strings(slice)
Expect(slice).To(Equal([]string{"hello1", "hello2"}))
})

Expand Down
Loading

0 comments on commit c7373ba

Please sign in to comment.