Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete reduntant lock in storage #2372

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void SetCompactRangeOptions(const bool is_canceled);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
void DbRWLockWriter();
void DbRWLockReader();
void DbRWUnLock();
/*
* Cache used
*/
Expand Down Expand Up @@ -164,7 +161,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::string log_path_;
std::string bgsave_sub_path_;
pstd::Mutex key_info_protector_;
std::shared_mutex db_rwlock_;
std::atomic<bool> binlog_io_error_;
std::shared_mutex dbs_rw_;
// class may be shared, using shared_ptr would be a better choice
Expand Down
3 changes: 0 additions & 3 deletions include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class PikaReplServerConn : public net::PbConn {
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<net::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response);
static void BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints, const uint32_t& term,
InnerMessage::InnerResponse* response);

static void HandleDBSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
static void HandleRemoveSlaveNodeRequest(void* arg);
Expand Down
16 changes: 8 additions & 8 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,9 +814,9 @@ void ShutdownCmd::DoInitial() {
// no return
void ShutdownCmd::Do() {
DLOG(WARNING) << "handle \'shutdown\'";
db_->DbRWUnLock();
db_->DBUnlockShared();
g_pika_server->Exit();
db_->DbRWLockReader();
db_->DBLockShared();
res_.SetRes(CmdRes::kNone);
}

Expand Down Expand Up @@ -1338,11 +1338,11 @@ void InfoCmd::InfoData(std::string& info) {
}
background_errors.clear();
memtable_usage = table_reader_usage = 0;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : background_errors) {
Expand Down Expand Up @@ -1376,9 +1376,9 @@ void InfoCmd::InfoRocksDB(std::string& info) {
continue;
}
std::string rocksdb_info;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetRocksDBInfo(rocksdb_info);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
tmp_stream << rocksdb_info;
}
info.append(tmp_stream.str());
Expand Down Expand Up @@ -3101,9 +3101,9 @@ void DiskRecoveryCmd::Do() {
}
db_item.second->SetBinlogIoErrorrelieve();
background_errors_.clear();
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
for (const auto &item: background_errors_) {
if (item.second != 0) {
rocksdb::Status s = db_item.second->storage()->GetDBByIndex(item.first)->Resume();
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,11 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {

void Cmd::DoCommand(const HintKeys& hint_keys) {
if (!IsSuspend()) {
db_->DbRWLockReader();
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DbRWUnLock();
db_->DBUnlockShared();
}
};
if (IsNeedCacheDo()
Expand Down
12 changes: 4 additions & 8 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }
std::shared_ptr<pstd::lock::LockMgr> DB::LockMgr() { return lock_mgr_; }
void DB::DbRWLockReader() { db_rwlock_.lock_shared(); }
void DB::DbRWUnLock() { db_rwlock_.unlock(); }
std::shared_ptr<PikaCache> DB::cache() const { return cache_; }
std::shared_ptr<storage::Storage> DB::storage() const { return storage_; }

Expand Down Expand Up @@ -197,8 +195,6 @@ void DB::SetCompactRangeOptions(const bool is_canceled) {
storage_->SetCompactRangeOptions(is_canceled);
}

void DB::DbRWLockWriter() { db_rwlock_.lock(); }

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
return cache_info_;
Expand Down Expand Up @@ -360,7 +356,7 @@ bool DB::InitBgsaveEngine() {
}

{
std::lock_guard lock(db_rwlock_);
std::lock_guard lock(dbs_rw_);
LogOffset bgsave_offset;
// term, index are 0
db->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset));
Expand Down Expand Up @@ -548,7 +544,7 @@ bool DB::ChangeDb(const std::string& new_path) {
tmp_path += "_bak";
pstd::DeleteDirIfExist(tmp_path);

std::lock_guard l(db_rwlock_);
std::lock_guard l(dbs_rw_);
LOG(INFO) << "DB: " << db_name_ << ", Prepare change db from: " << tmp_path;
storage_.reset();

Expand Down Expand Up @@ -580,7 +576,7 @@ void DB::ClearBgsave() {
}

bool DB::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
return FlushSubDBWithoutLock(db_name);
}

Expand Down Expand Up @@ -634,7 +630,7 @@ void DB::ResetDisplayCacheInfo(int status) {
}

bool DB::FlushDB() {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
std::lock_guard l(bgsave_protector_);
return FlushDBWithoutLock();
}
4 changes: 2 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr());
record_lock.Lock(c_ptr->current_key());
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWLockReader();
c_ptr->GetDB()->DBLockShared();
}
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
Expand All @@ -236,7 +236,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
c_ptr->Do();
}
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWUnLock();
c_ptr->GetDB()->DBUnlockShared();
}
record_lock.Unlock(c_ptr->current_key());
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
17 changes: 0 additions & 17 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,6 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
return true;
}

void PikaReplServerConn::BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints,
const uint32_t& term, InnerMessage::InnerResponse* response) {
InnerMessage::ConsensusMeta* consensus_meta = response->mutable_consensus_meta();
consensus_meta->set_term(term);
consensus_meta->set_reject(reject);
if (!reject) {
return;
}
for (const auto& hint : hints) {
InnerMessage::BinlogOffset* offset = consensus_meta->add_hint();
offset->set_filenum(hint.b_offset.filenum);
offset->set_offset(hint.b_offset.offset);
offset->set_term(hint.l_offset.term);
offset->set_index(hint.l_offset.index);
}
}

void PikaReplServerConn::HandleDBSyncRequest(void* arg) {
std::unique_ptr<ReplServerTaskArg> task_arg(static_cast<ReplServerTaskArg*>(arg));
const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req;
Expand Down
22 changes: 10 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ bool PikaServer::IsKeyScaning() {
bool PikaServer::IsCompacting() {
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
std::string task_type = db_item.second->storage()->GetCurrentTaskType();
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
if (strcasecmp(task_type.data(), "no") != 0) {
return true;
}
Expand Down Expand Up @@ -446,27 +446,27 @@ void PikaServer::PrepareDBTrySync() {
void PikaServer::DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetMaxCacheStatisticKeys(max_cache_statistic_keys);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionThreshold(small_compaction_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

Expand Down Expand Up @@ -1386,9 +1386,7 @@ storage::Status PikaServer::RewriteStorageOptions(const storage::OptionType& opt
storage::Status s;
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
s = db_item.second->storage()->SetOptions(option_type, storage::ALL_DB, options_map);
db_item.second->DbRWUnLock();
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1567,9 +1565,9 @@ void PikaServer::DisableCompact() {
/* cancel in-progress manual compactions */
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
db_item.second->DBLock();
db_item.second->SetCompactRangeOptions(true);
db_item.second->DbRWUnLock();
db_item.second->DBUnlock();
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,22 @@ void ExecCmd::Lock() {
g_pika_rm->DBLock();
}

std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWLockReader();
need_lock_db->DBLockShared();
});
}

void ExecCmd::Unlock() {
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWUnLock();
need_lock_db->DBUnlockShared();
});
if (is_lock_rm_dbs_) {
g_pika_rm->DBUnlock();
Expand Down
14 changes: 0 additions & 14 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ Status Redis::HDel(const Slice& key, const std::vector<std::string>& fields, int
std::string meta_value;
int32_t del_cnt = 0;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;

Expand Down Expand Up @@ -281,7 +280,6 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -359,7 +357,6 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value) {
new_value->clear();
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -553,7 +550,6 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -617,7 +613,6 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {

Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -682,7 +677,6 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int

Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, int32_t* ret) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -1030,8 +1024,6 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st

Status Redis::HashesExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1055,8 +1047,6 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl) {

Status Redis::HashesDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1077,8 +1067,6 @@ Status Redis::HashesDel(const Slice& key) {

Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1101,8 +1089,6 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {

Status Redis::HashesPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
Loading
Loading