Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Co-authored-by: wuxianrong <[email protected]>
  • Loading branch information
Mixficsol and wuxianrong authored Mar 7, 2024
1 parent 68bda71 commit e4bc855
Show file tree
Hide file tree
Showing 14 changed files with 36 additions and 165 deletions.
4 changes: 0 additions & 4 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void SetCompactRangeOptions(const bool is_canceled);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
void DbRWLockWriter();
void DbRWLockReader();
void DbRWUnLock();
/*
* Cache used
*/
Expand Down Expand Up @@ -164,7 +161,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::string log_path_;
std::string bgsave_sub_path_;
pstd::Mutex key_info_protector_;
std::shared_mutex db_rwlock_;
std::atomic<bool> binlog_io_error_;
std::shared_mutex dbs_rw_;
// class may be shared, using shared_ptr would be a better choice
Expand Down
3 changes: 0 additions & 3 deletions include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class PikaReplServerConn : public net::PbConn {
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<net::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response);
static void BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints, const uint32_t& term,
InnerMessage::InnerResponse* response);

static void HandleDBSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
static void HandleRemoveSlaveNodeRequest(void* arg);
Expand Down
16 changes: 8 additions & 8 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,9 +814,9 @@ void ShutdownCmd::DoInitial() {
// no return
void ShutdownCmd::Do() {
DLOG(WARNING) << "handle \'shutdown\'";
db_->DbRWUnLock();
db_->DBUnlockShared();
g_pika_server->Exit();
db_->DbRWLockReader();
db_->DBLockShared();
res_.SetRes(CmdRes::kNone);
}

Expand Down Expand Up @@ -1338,11 +1338,11 @@ void InfoCmd::InfoData(std::string& info) {
}
background_errors.clear();
memtable_usage = table_reader_usage = 0;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : background_errors) {
Expand Down Expand Up @@ -1376,9 +1376,9 @@ void InfoCmd::InfoRocksDB(std::string& info) {
continue;
}
std::string rocksdb_info;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetRocksDBInfo(rocksdb_info);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
tmp_stream << rocksdb_info;
}
info.append(tmp_stream.str());
Expand Down Expand Up @@ -3101,9 +3101,9 @@ void DiskRecoveryCmd::Do() {
}
db_item.second->SetBinlogIoErrorrelieve();
background_errors_.clear();
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
for (const auto &item: background_errors_) {
if (item.second != 0) {
rocksdb::Status s = db_item.second->storage()->GetDBByIndex(item.first)->Resume();
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,11 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {

void Cmd::DoCommand(const HintKeys& hint_keys) {
if (!IsSuspend()) {
db_->DbRWLockReader();
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DbRWUnLock();
db_->DBUnlockShared();
}
};
if (IsNeedCacheDo()
Expand Down
12 changes: 4 additions & 8 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }
std::shared_ptr<pstd::lock::LockMgr> DB::LockMgr() { return lock_mgr_; }
void DB::DbRWLockReader() { db_rwlock_.lock_shared(); }
void DB::DbRWUnLock() { db_rwlock_.unlock(); }
std::shared_ptr<PikaCache> DB::cache() const { return cache_; }
std::shared_ptr<storage::Storage> DB::storage() const { return storage_; }

Expand Down Expand Up @@ -197,8 +195,6 @@ void DB::SetCompactRangeOptions(const bool is_canceled) {
storage_->SetCompactRangeOptions(is_canceled);
}

void DB::DbRWLockWriter() { db_rwlock_.lock(); }

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
return cache_info_;
Expand Down Expand Up @@ -360,7 +356,7 @@ bool DB::InitBgsaveEngine() {
}

{
std::lock_guard lock(db_rwlock_);
std::lock_guard lock(dbs_rw_);
LogOffset bgsave_offset;
// term, index are 0
db->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset));
Expand Down Expand Up @@ -548,7 +544,7 @@ bool DB::ChangeDb(const std::string& new_path) {
tmp_path += "_bak";
pstd::DeleteDirIfExist(tmp_path);

std::lock_guard l(db_rwlock_);
std::lock_guard l(dbs_rw_);
LOG(INFO) << "DB: " << db_name_ << ", Prepare change db from: " << tmp_path;
storage_.reset();

Expand Down Expand Up @@ -580,7 +576,7 @@ void DB::ClearBgsave() {
}

bool DB::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
return FlushSubDBWithoutLock(db_name);
}

Expand Down Expand Up @@ -634,7 +630,7 @@ void DB::ResetDisplayCacheInfo(int status) {
}

bool DB::FlushDB() {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
std::lock_guard l(bgsave_protector_);
return FlushDBWithoutLock();
}
4 changes: 2 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr());
record_lock.Lock(c_ptr->current_key());
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWLockReader();
c_ptr->GetDB()->DBLockShared();
}
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
Expand All @@ -236,7 +236,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
c_ptr->Do();
}
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWUnLock();
c_ptr->GetDB()->DBUnlockShared();
}
record_lock.Unlock(c_ptr->current_key());
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
17 changes: 0 additions & 17 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,6 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
return true;
}

void PikaReplServerConn::BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints,
const uint32_t& term, InnerMessage::InnerResponse* response) {
InnerMessage::ConsensusMeta* consensus_meta = response->mutable_consensus_meta();
consensus_meta->set_term(term);
consensus_meta->set_reject(reject);
if (!reject) {
return;
}
for (const auto& hint : hints) {
InnerMessage::BinlogOffset* offset = consensus_meta->add_hint();
offset->set_filenum(hint.b_offset.filenum);
offset->set_offset(hint.b_offset.offset);
offset->set_term(hint.l_offset.term);
offset->set_index(hint.l_offset.index);
}
}

void PikaReplServerConn::HandleDBSyncRequest(void* arg) {
std::unique_ptr<ReplServerTaskArg> task_arg(static_cast<ReplServerTaskArg*>(arg));
const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req;
Expand Down
22 changes: 10 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ bool PikaServer::IsKeyScaning() {
bool PikaServer::IsCompacting() {
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
std::string task_type = db_item.second->storage()->GetCurrentTaskType();
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
if (strcasecmp(task_type.data(), "no") != 0) {
return true;
}
Expand Down Expand Up @@ -446,27 +446,27 @@ void PikaServer::PrepareDBTrySync() {
void PikaServer::DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetMaxCacheStatisticKeys(max_cache_statistic_keys);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionThreshold(small_compaction_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

Expand Down Expand Up @@ -1386,9 +1386,7 @@ storage::Status PikaServer::RewriteStorageOptions(const storage::OptionType& opt
storage::Status s;
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
s = db_item.second->storage()->SetOptions(option_type, storage::ALL_DB, options_map);
db_item.second->DbRWUnLock();
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1567,9 +1565,9 @@ void PikaServer::DisableCompact() {
/* cancel in-progress manual compactions */
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
db_item.second->DBLock();
db_item.second->SetCompactRangeOptions(true);
db_item.second->DbRWUnLock();
db_item.second->DBUnlock();
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,22 @@ void ExecCmd::Lock() {
g_pika_rm->DBLock();
}

std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWLockReader();
need_lock_db->DBLockShared();
});
}

void ExecCmd::Unlock() {
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWUnLock();
need_lock_db->DBUnlockShared();
});
if (is_lock_rm_dbs_) {
g_pika_rm->DBUnlock();
Expand Down
14 changes: 0 additions & 14 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ Status Redis::HDel(const Slice& key, const std::vector<std::string>& fields, int
std::string meta_value;
int32_t del_cnt = 0;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;

Expand Down Expand Up @@ -281,7 +280,6 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -359,7 +357,6 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value) {
new_value->clear();
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -553,7 +550,6 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -617,7 +613,6 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {

Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -682,7 +677,6 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int

Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, int32_t* ret) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -1030,8 +1024,6 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st

Status Redis::HashesExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1055,8 +1047,6 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl) {

Status Redis::HashesDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1077,8 +1067,6 @@ Status Redis::HashesDel(const Slice& key) {

Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1101,8 +1089,6 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {

Status Redis::HashesPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
Loading

0 comments on commit e4bc855

Please sign in to comment.