Skip to content

Commit

Permalink
fix lock
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxianrong committed Feb 1, 2024
2 parents a89a391 + d49365e commit 321885c
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Pika is a high-performance, large-capacity, multi-tenant, data-persistent elasti

When Redis's in-memory usage exceeds 16GiB, it faces problems such as limited memory capacity, single-threaded blocking, long startup recovery time, high memory hardware costs, easily filled buffers, and high switching costs when one master and multiple replicas fail. The emergence of Pika is not to replace Redis but to complement it. Pika strives to completely comply with the Redis protocol, inherit Redis's convenient operation and maintenance design, and solve the bottleneck problem of Redis running out of memory capacity once the data volume becomes huge by using persistent storage. Additionally, Pika can support master-slave mode using the slaveof command, and it also supports full and incremental data synchronization.

Pika can be deployed in a single-machine master-slave mode (slaveof) or in a [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/cluster) cluster mode, allowing for simple scaling and shrinking. Migration from Redis to Pika can be smoothly executed by [tools](https://github.com/OpenAtomFoundation/pika/tree/unstable/tools).
Pika can be deployed in a single-machine master-slave mode (slaveof) or in a [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/codis) cluster mode, allowing for simple scaling and shrinking. Migration from Redis to Pika can be smoothly executed by [tools](https://github.com/OpenAtomFoundation/pika/tree/unstable/tools).

## Pika Features

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Pika 是一个以 RocksDB 为存储引擎的的大容量、高性能、多租户

Redis 的内存使用量超过一定阈值【如 16GiB 】时,会面临内存容量有限、单线程阻塞、启动恢复时间长、内存硬件成本贵、缓冲区容易写满、一主多从故障时切换代价大等问题。Pika 的出现并不是为了替代 Redis, 而是 Redis 补充。Pika 力求在完全兼容Redis 协议、继承 Redis 便捷运维设计的前提下,通过持久化存储的方式解决了 Redis 一旦存储数据量巨大就会出现内存容量不足的瓶颈问题,并且可以像 Redis 一样,支持使用 slaveof 命令实现主从模式,还支持数据的全量同步和增量同步。

还可以通过 twemproxy or [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/cluster) 以静态数据分片方式实现 Pika 集群。
还可以通过 twemproxy or [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/codis) 以静态数据分片方式实现 Pika 集群。

## Pika特性

Expand Down
3 changes: 3 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,11 @@ struct UnblockTaskArgs {
: key(std::move(key_)), db(db_), dispatchThread(dispatchThread_) {}
};

class PikaClientConn;

class Cmd : public std::enable_shared_from_this<Cmd> {
public:
friend class PikaClientConn;
enum CmdStage { kNone, kBinlogStage, kExecuteStage };
struct HintKeys {
HintKeys() = default;
Expand Down
4 changes: 3 additions & 1 deletion include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::shared_ptr<PikaCache> cache() const;
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
s}
void DBLock() {
dbs_rw_.lock();
}
Expand All @@ -126,6 +126,8 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);

void SetCompactRangeOptions(const bool is_canceled);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
/*
* Cache used
Expand Down
5 changes: 5 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ class PikaServer : public pstd::noncopyable {
void ProcessCronTask();
double HitRatio();

/*
* disable compact
*/
void DisableCompact();

/*
* lastsave used
*/
Expand Down
2 changes: 1 addition & 1 deletion src/cache/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ project (cache)
aux_source_directory(./src DIR_SRCS)
include_directories(include)
add_library(cache STATIC ${DIR_SRCS})
add_dependencies(cache net protobuf glog gflags ${LIBUNWIND_NAME})
add_dependencies(cache net protobuf glog gflags rediscache ${LIBUNWIND_NAME})

target_link_libraries(cache
PUBLIC ${GTEST_LIBRARY}
Expand Down
2 changes: 2 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ void ShutdownCmd::DoInitial() {
// no return
void ShutdownCmd::Do() {
DLOG(WARNING) << "handle \'shutdown\'";
db_->DbRWUnLock();
g_pika_server->Exit();
db_->DbRWLockReader();
res_.SetRes(CmdRes::kNone);
}

Expand Down
18 changes: 18 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "include/pika_cmd_table_manager.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "net/src/dispatch_thread.h"
Expand Down Expand Up @@ -164,6 +165,23 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kErrOther, "Server in read-only");
return c_ptr;
}
} else if (c_ptr->is_read() && c_ptr->flag_ == 0) {
const auto& server_guard = std::lock_guard(g_pika_server->GetDBLock());
int role = 0;
auto status = g_pika_rm->CheckDBRole(current_db_, &role);
if (!status.ok()) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
return c_ptr;
} else if ((role & PIKA_ROLE_SLAVE) == PIKA_ROLE_SLAVE) {
const auto& slave_db = g_pika_rm->GetSyncSlaveDBByName(DBInfo(current_db_));
if (!slave_db) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
return c_ptr;
} else if (slave_db->State() != ReplState::kConnected) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Full sync not completed");
return c_ptr;
}
}
}

// Process Command
Expand Down
7 changes: 7 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ void DB::InitKeyScan() {
key_scan_info_.duration = -1; // duration -1 mean the task in processing
}

void DB::SetCompactRangeOptions(const bool is_canceled) {
if (!opened_) {
return;
}
storage_->SetCompactRangeOptions(is_canceled);
}

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
return cache_info_;
Expand Down
20 changes: 20 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void PikaServer::Start() {
}

void PikaServer::Exit() {
g_pika_server->DisableCompact();
exit_mutex_.unlock();
exit_ = true;
}
Expand Down Expand Up @@ -1664,6 +1665,25 @@ void PikaServer::CheckPubsubClientKill(const std::string& userName, const std::v
});
}

void PikaServer::DisableCompact() {
/* disable auto compactions */
std::unordered_map<std::string, std::string> options_map{{"disable_auto_compactions", "true"}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
LOG(ERROR) << "-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions error: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetDisableAutoCompaction("true");

/* cancel in-progress manual compactions */
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
db_item.second->SetCompactRangeOptions(true);
db_item.second->DbRWUnLock();
}
}

void DoBgslotscleanup(void* arg) {
auto p = static_cast<PikaServer*>(arg);
PikaServer::BGSlotsCleanup cleanup = p->bgslots_cleanup();
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void SetCompactRangeOptions(const bool is_canceled);
Status EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
Status EnableAutoCompaction(const OptionType& option_type,
Expand Down
12 changes: 12 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ Redis::~Redis() {
delete handle;
}
delete db_;

if (default_compact_range_options_.canceled) {
delete default_compact_range_options_.canceled;
}
}

Status Redis::GetScanStartPoint(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_point) {
Expand Down Expand Up @@ -187,4 +191,12 @@ void Redis::SetWriteWalOptions(const bool is_wal_disable) {
default_write_options_.disableWAL = is_wal_disable;
}

void Redis::SetCompactRangeOptions(const bool is_canceled) {
if (!default_compact_range_options_.canceled) {
default_compact_range_options_.canceled = new std::atomic<bool>(is_canceled);
} else {
default_compact_range_options_.canceled->store(is_canceled);
}
}

} // namespace storage
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Redis {

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void SetWriteWalOptions(const bool is_wal_disable);
void SetCompactRangeOptions(const bool is_canceled);

// Common Commands
virtual Status Open(const StorageOptions& storage_options, const std::string& db_path) = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,14 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}

void Storage::SetCompactRangeOptions(const bool is_canceled) {
strings_db_->SetCompactRangeOptions(is_canceled);
hashes_db_->SetCompactRangeOptions(is_canceled);
lists_db_->SetCompactRangeOptions(is_canceled);
sets_db_->SetCompactRangeOptions(is_canceled);
zsets_db_->SetCompactRangeOptions(is_canceled);
}

Status Storage::EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options) {
Status s;
Expand Down

0 comments on commit 321885c

Please sign in to comment.