From 061edba70357aee1c8f8e1e7ef11ae011c153adf Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Fri, 2 Aug 2024 09:58:33 +0800 Subject: [PATCH 1/3] fix:delete logs (#2840) Co-authored-by: chejinge --- src/pika_cache_load_thread.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/pika_cache_load_thread.cc b/src/pika_cache_load_thread.cc index 5e4a050706..b2205a7d49 100644 --- a/src/pika_cache_load_thread.cc +++ b/src/pika_cache_load_thread.cc @@ -69,8 +69,6 @@ bool PikaCacheLoadThread::LoadHash(std::string& key, const std::shared_ptr& int32_t len = 0; db->storage()->HLen(key, &len); if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { - LOG(WARNING) << "can not load key, because item size:" << len - << " beyond max item size:" << CACHE_VALUE_ITEM_MAX_SIZE; return false; } @@ -205,8 +203,6 @@ void *PikaCacheLoadThread::ThreadMain() { for (auto & load_key : load_keys) { if (LoadKey(std::get<0>(load_key), std::get<1>(load_key), std::get<2>(load_key))) { ++async_load_keys_num_; - } else { - LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << std::get<1>(load_key) << " failed !!!"; } std::unique_lock lm(loadkeys_map_mutex_); From 1e71ff0b36fa797ce219cec13d357a34034acaa5 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Fri, 2 Aug 2024 15:35:10 +0800 Subject: [PATCH 2/3] fix: execute hincrby cmd more than one times after delete a field which is existing (#2836) * fix hincrby cmd * add test for hincrby cmd --- src/storage/src/redis_hashes.cc | 3 ++- tests/integration/hash_test.go | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 03a3c1c9b8..9193dddd1c 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -308,7 +308,8 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64 batch.Put(handles_[kMetaCF], base_meta_key.Encode(), meta_value); HashesDataKey hashes_data_key(key, version, field); Int64ToStr(value_buf, 32, value); - batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), value_buf); + BaseDataValue internal_value(value_buf); + batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode()); *ret = value; } else { version = parsed_hashes_meta_value.Version(); diff --git a/tests/integration/hash_test.go b/tests/integration/hash_test.go index cf9448f75e..0ee0dccf1b 100644 --- a/tests/integration/hash_test.go +++ b/tests/integration/hash_test.go @@ -140,6 +140,27 @@ var _ = Describe("Hash Commands", func() { Expect(hIncrBy.Val()).To(Equal(int64(-5))) }) + It("should HIncrBy against wrong metadata", func() { + hSet := client.HSet(ctx, "hash", "key", "5") + Expect(hSet.Err()).NotTo(HaveOccurred()) + + hIncrBy := client.HIncrBy(ctx, "hash", "key", 1) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(6))) + + hDel := client.HDel(ctx, "hash", "key") + Expect(hDel.Err()).NotTo(HaveOccurred()) + Expect(hDel.Val()).To(Equal(int64(1))) + + hIncrBy = client.HIncrBy(ctx, "hash", "key", 1) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(1))) + + hIncrBy = client.HIncrBy(ctx, "hash", "key", 2) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(3))) + }) + It("should HIncrByFloat", func() { hSet := client.HSet(ctx, "hash", "field", "10.50") Expect(hSet.Err()).NotTo(HaveOccurred()) From a2acd88fbb7c8d1d1d3a5a3320c5e6c4b8f8dcc7 Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Sat, 3 Aug 2024 22:26:55 +0800 Subject: [PATCH 3/3] fix: add switch for RTC cache read (#2841) * 1 add a switch for RTC feature 2 avoid unnecessary cache read if rtc is already cache missed * revised --- conf/pika.conf | 7 ++++++- include/pika_client_conn.h | 7 ++++--- include/pika_command.h | 4 ++++ include/pika_conf.h | 2 ++ src/pika_client_conn.cc | 24 +++++++++++++----------- src/pika_command.cc | 10 +++++++--- src/pika_conf.cc | 5 +++++ 7 files changed, 41 insertions(+), 18 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 496d974174..60772e4c29 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3; # Master's run-id # master-run-id : -# The number of threads for running Pika. +# The number of Net-worker threads in Pika. # It's not recommended to set this value exceeds # the number of CPU cores on the deployment server. thread-num : 1 +# use Net worker thread to read redis Cache for [Get, HGet] command, +# which can significantly improve QPS and reduce latency when cache hit rate is high +# default value is "yes", set it to "no" if you wanna disable it +rtc-cache-read : yes + # Size of the thread pool, The threads within this pool # are dedicated to handling user requests. thread-pool-size : 12 diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 30a371f6cf..5b912592ab 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr resp_ptr; LogOffset offset; std::string db_name; + bool cache_miss_in_rtc_; }; struct TxnStateBitMask { @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn { void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt); - void BatchExecRedisCmd(const std::vector& argvs); + void BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr user_; std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr); + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration); void ProcessMonitor(const PikaCmdArgsType& argv); - void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr); + void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void TryWriteResp(); }; diff --git a/include/pika_command.h b/include/pika_command.h index de06c332c8..c9e924eb8c 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this { uint32_t GetCmdId() const { return cmdId_; }; bool CheckArg(uint64_t num) const; + bool IsCacheMissedInRtc() const; + void SetCacheMissedInRtc(bool value); + protected: // enable copy, used default copy // Cmd(const Cmd&); @@ -603,6 +606,7 @@ class Cmd : public std::enable_shared_from_this { uint64_t do_duration_ = 0; uint32_t cmdId_ = 0; uint32_t aclCategory_ = 0; + bool cache_missed_in_rtc_{false}; private: virtual void DoInitial() = 0; diff --git a/include/pika_conf.h b/include/pika_conf.h index d85b7550dd..6638622ed2 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf { // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } + bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; } std::string pidfile() { return pidfile_; } int binlog_file_size() { return binlog_file_size_; } std::vector compression_per_level(); @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf { int level0_file_num_compaction_trigger_ = 4; int64_t max_client_response_size_ = 0; bool daemonize_ = false; + bool rtc_cache_read_enabled_ = false; int timeout_ = 0; std::string server_id_; std::string run_id_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 7834c057ef..85e740de1a 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* } std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr) { + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc) { // Get command info std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); if (!c_ptr) { @@ -47,6 +47,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } return tmp_ptr; } + c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc); c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& time_stat_->Reset(); if (async) { auto arg = new BgTaskArg(); + arg->cache_miss_in_rtc_ = false; arg->redis_cmds = argvs; time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); @@ -288,7 +290,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); //we don't intercept pipeline batch (argvs.size() > 1) - if (argvs.size() == 1 && IsInterceptedByRTC(opt) && + if (g_pika_conf->rtc_cache_read_enabled() && + argvs.size() == 1 && IsInterceptedByRTC(opt) && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && !IsInTxn()) { // read in cache @@ -296,13 +299,14 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& delete arg; return; } + arg->cache_miss_in_rtc_ = true; time_stat_->before_queue_ts_ = pstd::NowMicros(); } g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } - BatchExecRedisCmd(argvs); + BatchExecRedisCmd(argvs, false); } void PikaClientConn::DoBackgroundTask(void* arg) { @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) { } } - conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); + conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_); } -void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs) { +void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc) { resp_num.store(static_cast(argvs.size())); for (const auto& argv : argvs) { std::shared_ptr resp_ptr = std::make_shared(); resp_array.push_back(resp_ptr); - ExecRedisCmd(argv, resp_ptr); + ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); } time_stat_->process_done_ts_ = pstd::NowMicros(); TryWriteResp(); @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std return false; } //only read command(Get, HGet) will reach here, no need of record lock - if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) { - return false; - } bool read_status = c_ptr->DoReadCommandInCache(); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); resp_num--; @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() { } } -void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr) { +void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, + bool cache_miss_in_rtc) { // get opt std::string opt = argv[0]; pstd::StringToLower(opt); @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr); + std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); *resp_ptr = std::move(cmd_ptr->res().message()); resp_num--; } diff --git a/src/pika_command.cc b/src/pika_command.cc index b92f75dd22..bab8dd93f6 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) { bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); } Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory) - : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) { + : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) { } void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) { @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { if (IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { - if (IsNeedReadCache()) { + if (!cache_missed_in_rtc_ + && IsNeedReadCache()) { ReadCache(); } - if (is_read() && res().CacheMiss()) { + if (is_read() + && (res().CacheMiss() || cache_missed_in_rtc_)) { pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key()); DoThroughDB(); if (IsNeedUpdateCache()) { @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr& resp) { resp_ = resp; } std::shared_ptr Cmd::GetResp() { return resp_.lock(); } void Cmd::SetStage(CmdStage stage) { stage_ = stage; } +bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; } +void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 913f669880..741168be94 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -541,6 +541,11 @@ int PikaConf::Load() { GetConfStr("daemonize", &dmz); daemonize_ = dmz == "yes"; + // read redis cache in Net worker threads + std::string rtc_enabled; + GetConfStr("rtc-cache-read", &rtc_enabled); + rtc_cache_read_enabled_ = rtc_enabled != "no"; + // binlog std::string wb; GetConfStr("write-binlog", &wb);