Skip to content

Commit

Permalink
fix lock
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxianrong committed Feb 1, 2024
2 parents a89a391 + d49365e commit 9194869
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Pika is a high-performance, large-capacity, multi-tenant, data-persistent elasti

When Redis's in-memory usage exceeds 16GiB, it faces problems such as limited memory capacity, single-threaded blocking, long startup recovery time, high memory hardware costs, easily filled buffers, and high switching costs when one master and multiple replicas fail. The emergence of Pika is not to replace Redis but to complement it. Pika strives to completely comply with the Redis protocol, inherit Redis's convenient operation and maintenance design, and solve the bottleneck problem of Redis running out of memory capacity once the data volume becomes huge by using persistent storage. Additionally, Pika can support master-slave mode using the slaveof command, and it also supports full and incremental data synchronization.

Pika can be deployed in a single-machine master-slave mode (slaveof) or in a [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/cluster) cluster mode, allowing for simple scaling and shrinking. Migration from Redis to Pika can be smoothly executed by [tools](https://github.com/OpenAtomFoundation/pika/tree/unstable/tools).
Pika can be deployed in a single-machine master-slave mode (slaveof) or in a [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/codis) cluster mode, allowing for simple scaling and shrinking. Migration from Redis to Pika can be smoothly executed by [tools](https://github.com/OpenAtomFoundation/pika/tree/unstable/tools).

## Pika Features

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Pika 是一个以 RocksDB 为存储引擎的的大容量、高性能、多租户

Redis 的内存使用量超过一定阈值【如 16GiB 】时,会面临内存容量有限、单线程阻塞、启动恢复时间长、内存硬件成本贵、缓冲区容易写满、一主多从故障时切换代价大等问题。Pika 的出现并不是为了替代 Redis, 而是 Redis 补充。Pika 力求在完全兼容Redis 协议、继承 Redis 便捷运维设计的前提下,通过持久化存储的方式解决了 Redis 一旦存储数据量巨大就会出现内存容量不足的瓶颈问题,并且可以像 Redis 一样,支持使用 slaveof 命令实现主从模式,还支持数据的全量同步和增量同步。

还可以通过 twemproxy or [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/cluster) 以静态数据分片方式实现 Pika 集群。
还可以通过 twemproxy or [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/codis) 以静态数据分片方式实现 Pika 集群。

## Pika特性

Expand Down
3 changes: 3 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,11 @@ struct UnblockTaskArgs {
: key(std::move(key_)), db(db_), dispatchThread(dispatchThread_) {}
};

class PikaClientConn;

class Cmd : public std::enable_shared_from_this<Cmd> {
public:
friend class PikaClientConn;
enum CmdStage { kNone, kBinlogStage, kExecuteStage };
struct HintKeys {
HintKeys() = default;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);

void SetCompactRangeOptions(const bool is_canceled);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
/*
* Cache used
Expand Down Expand Up @@ -162,6 +164,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::atomic<bool> binlog_io_error_;
std::shared_mutex dbs_rw_;
// class may be shared, using shared_ptr would be a better choice
std::shared_ptr<pstd::lock::LockMgr> lock_mgr_;
std::shared_ptr<storage::Storage> storage_;
std::shared_ptr<PikaCache> cache_;
/*
Expand Down
5 changes: 5 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ class PikaServer : public pstd::noncopyable {
void ProcessCronTask();
double HitRatio();

/*
* disable compact
*/
void DisableCompact();

/*
* lastsave used
*/
Expand Down
2 changes: 1 addition & 1 deletion src/cache/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ project (cache)
aux_source_directory(./src DIR_SRCS)
include_directories(include)
add_library(cache STATIC ${DIR_SRCS})
add_dependencies(cache net protobuf glog gflags ${LIBUNWIND_NAME})
add_dependencies(cache net protobuf glog gflags rediscache ${LIBUNWIND_NAME})

target_link_libraries(cache
PUBLIC ${GTEST_LIBRARY}
Expand Down
8 changes: 8 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ void ShutdownCmd::DoInitial() {
// no return
void ShutdownCmd::Do() {
DLOG(WARNING) << "handle \'shutdown\'";
db_->DBUnlock();
g_pika_server->Exit();
db_->DBLockShared();
res_.SetRes(CmdRes::kNone);
}

Expand Down Expand Up @@ -1345,9 +1347,11 @@ void InfoCmd::InfoData(std::string& info) {
}
background_errors.clear();
memtable_usage = table_reader_usage = 0;
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : background_errors) {
Expand Down Expand Up @@ -1381,7 +1385,9 @@ void InfoCmd::InfoRocksDB(std::string& info) {
continue;
}
std::string rocksdb_info;
db_item.second->DBLockShared();
db_item.second->storage()->GetRocksDBInfo(rocksdb_info);
db_item.second->DBUnlockShared();
tmp_stream << rocksdb_info;
}
info.append(tmp_stream.str());
Expand Down Expand Up @@ -3070,7 +3076,9 @@ void DiskRecoveryCmd::Do() {
}
db_item.second->SetBinlogIoErrorrelieve();
background_errors_.clear();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_);
db_item.second->DBUnlockShared();
for (const auto &item: background_errors_) {
if (item.second != 0) {
rocksdb::Status s = db_item.second->storage()->GetDBByType(item.first)->Resume();
Expand Down
18 changes: 18 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "include/pika_cmd_table_manager.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "net/src/dispatch_thread.h"
Expand Down Expand Up @@ -164,6 +165,23 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kErrOther, "Server in read-only");
return c_ptr;
}
} else if (c_ptr->is_read() && c_ptr->flag_ == 0) {
const auto& server_guard = std::lock_guard(g_pika_server->GetDBLock());
int role = 0;
auto status = g_pika_rm->CheckDBRole(current_db_, &role);
if (!status.ok()) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
return c_ptr;
} else if ((role & PIKA_ROLE_SLAVE) == PIKA_ROLE_SLAVE) {
const auto& slave_db = g_pika_rm->GetSyncSlaveDBByName(DBInfo(current_db_));
if (!slave_db) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
return c_ptr;
} else if (slave_db->State() != ReplState::kConnected) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Full sync not completed");
return c_ptr;
}
}
}

// Process Command
Expand Down
8 changes: 8 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,14 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
}

void Cmd::DoCommand(const HintKeys& hint_keys) {
if (!IsSuspend()) {
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DBUnlockShared();
}
};
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
Expand Down
9 changes: 9 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ DB::DB(std::string db_name, const std::string& db_path,
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_);
pstd::CreatePath(db_path_);
pstd::CreatePath(log_path_);
lock_mgr_ = std::make_shared<pstd::lock::LockMgr>(1000, 0, std::make_shared<pstd::lock::MutexFactoryImpl>());
binlog_io_error_.store(false);
opened_ = s.ok();
assert(storage_);
Expand Down Expand Up @@ -69,6 +70,7 @@ void DB::BgSaveDB() {
void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }
std::shared_ptr<pstd::lock::LockMgr> DB::LockMgr() { return lock_mgr_; }
std::shared_ptr<PikaCache> DB::cache() const { return cache_; }
std::shared_ptr<storage::Storage> DB::storage() const { return storage_; }

Expand Down Expand Up @@ -186,6 +188,13 @@ void DB::InitKeyScan() {
key_scan_info_.duration = -1; // duration -1 mean the task in processing
}

void DB::SetCompactRangeOptions(const bool is_canceled) {
if (!opened_) {
return;
}
storage_->SetCompactRangeOptions(is_canceled);
}

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
return cache_info_;
Expand Down
8 changes: 7 additions & 1 deletion src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = pstd::NowMicros();
}
// Add read lock for no suspend command
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBLockShared();
}
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
&& c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
Expand All @@ -229,7 +233,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
} else {
c_ptr->Do();
}

if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBUnlockShared();
}
if (g_pika_conf->slowlog_slower_than() >= 0) {
auto start_time = static_cast<int32_t>(start_us / 1000000);
auto duration = static_cast<int64_t>(pstd::NowMicros() - start_us);
Expand Down
28 changes: 28 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void PikaServer::Start() {
}

void PikaServer::Exit() {
g_pika_server->DisableCompact();
exit_mutex_.unlock();
exit_ = true;
}
Expand Down Expand Up @@ -351,7 +352,9 @@ bool PikaServer::IsKeyScaning() {
bool PikaServer::IsCompacting() {
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLockShared();
std::string task_type = db_item.second->storage()->GetCurrentTaskType();
db_item.second->DBUnlockShared();
if (strcasecmp(task_type.data(), "no") != 0) {
return true;
}
Expand Down Expand Up @@ -448,21 +451,27 @@ void PikaServer::PrepareDBTrySync() {
void PikaServer::DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLockShared();
db_item.second->storage()->SetMaxCacheStatisticKeys(max_cache_statistic_keys);
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionThreshold(small_compaction_threshold);
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
db_item.second->DBUnlockShared();
}
}

Expand Down Expand Up @@ -1664,6 +1673,25 @@ void PikaServer::CheckPubsubClientKill(const std::string& userName, const std::v
});
}

void PikaServer::DisableCompact() {
/* disable auto compactions */
std::unordered_map<std::string, std::string> options_map{{"disable_auto_compactions", "true"}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
LOG(ERROR) << "-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions error: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetDisableAutoCompaction("true");

/* cancel in-progress manual compactions */
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLock();
db_item.second->SetCompactRangeOptions(true);
db_item.second->DBUnlock();
}
}

void DoBgslotscleanup(void* arg) {
auto p = static_cast<PikaServer*>(arg);
PikaServer::BGSlotsCleanup cleanup = p->bgslots_cleanup();
Expand Down
19 changes: 11 additions & 8 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,24 @@ void ExecCmd::Lock() {
g_pika_rm->DBLock();
}

std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_db]);
}
need_lock_db->DBLockShared();
});
}

void ExecCmd::Unlock() {
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_db]);
}
need_lock_db->DBUnlockShared();
});

if (is_lock_rm_dbs_) {
g_pika_rm->DBUnlock();
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void SetCompactRangeOptions(const bool is_canceled);
Status EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
Status EnableAutoCompaction(const OptionType& option_type,
Expand Down
12 changes: 12 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ Redis::~Redis() {
delete handle;
}
delete db_;

if (default_compact_range_options_.canceled) {
delete default_compact_range_options_.canceled;
}
}

Status Redis::GetScanStartPoint(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_point) {
Expand Down Expand Up @@ -187,4 +191,12 @@ void Redis::SetWriteWalOptions(const bool is_wal_disable) {
default_write_options_.disableWAL = is_wal_disable;
}

void Redis::SetCompactRangeOptions(const bool is_canceled) {
if (!default_compact_range_options_.canceled) {
default_compact_range_options_.canceled = new std::atomic<bool>(is_canceled);
} else {
default_compact_range_options_.canceled->store(is_canceled);
}
}

} // namespace storage
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Redis {

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void SetWriteWalOptions(const bool is_wal_disable);
void SetCompactRangeOptions(const bool is_canceled);

// Common Commands
virtual Status Open(const StorageOptions& storage_options, const std::string& db_path) = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,14 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}

void Storage::SetCompactRangeOptions(const bool is_canceled) {
strings_db_->SetCompactRangeOptions(is_canceled);
hashes_db_->SetCompactRangeOptions(is_canceled);
lists_db_->SetCompactRangeOptions(is_canceled);
sets_db_->SetCompactRangeOptions(is_canceled);
zsets_db_->SetCompactRangeOptions(is_canceled);
}

Status Storage::EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options) {
Status s;
Expand Down

0 comments on commit 9194869

Please sign in to comment.