Skip to content

Commit

Permalink
fix issue 4274 (#4627)
Browse files Browse the repository at this point in the history
* fix issue 4274

* fix bug

Co-authored-by: Doodle <[email protected]>
  • Loading branch information
cangfengzhs and critical27 authored Sep 14, 2022
1 parent a7a0185 commit a54050c
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 22 deletions.
9 changes: 7 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/utils/Utils.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/RocksEngineConfig.h"
#include "kvstore/stats/KVStats.h"

DEFINE_int32(cluster_id, 0, "A unique id for each cluster");

Expand Down Expand Up @@ -214,7 +215,9 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
std::unique_ptr<LogIterator> iter, bool wait, bool needLock) {
// We should apply any membership change which happens before start time. Because when we start
// up, the peers comes from meta, has already contains all previous changes.
SCOPED_TIMER(&execTime_);
SCOPED_TIMER([](uint64_t elapsedTime) {
stats::StatsManager::addValue(kCommitLogLatencyUs, elapsedTime);
});
auto batch = engine_->startBatchWrite();
LogID lastId = kNoCommitLogId;
TermID lastTerm = kNoCommitLogTerm;
Expand Down Expand Up @@ -364,7 +367,9 @@ std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Part::commitSnapshot(
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
SCOPED_TIMER(&execTime_);
SCOPED_TIMER([](uint64_t elapsedTime) {
stats::StatsManager::addValue(kCommitSnapshotLatencyUs, elapsedTime);
});
auto batch = engine_->startBatchWrite();
int64_t count = 0;
int64_t size = 0;
Expand Down
12 changes: 5 additions & 7 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,8 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
committed = committedLogId_;
// Step 1: Write WAL
{
SCOPED_TIMER(&execTime_);
SCOPED_TIMER(
[](uint64_t execTime) { stats::StatsManager::addValue(kAppendWalLatencyUs, execTime); });
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
Expand All @@ -901,7 +902,6 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
break;
}
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
lastId = wal_->lastLogId();
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
Expand Down Expand Up @@ -1079,7 +1079,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
*/
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true, true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
stats::StatsManager::addValue(kCommitLogLatencyUs, execTime_);
std::lock_guard<std::mutex> g(raftLock_);
CHECK_EQ(lastLogId, lastCommitId);
committedLogId_ = lastCommitId;
Expand Down Expand Up @@ -1760,10 +1759,11 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
if (hasLogsToAppend) {
bool result = false;
{
SCOPED_TIMER(&execTime_);
SCOPED_TIMER([](uint64_t execTime) {
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime);
});
result = wal_->appendLogs(logIter);
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
if (result) {
CHECK_EQ(lastId, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
Expand Down Expand Up @@ -1792,7 +1792,6 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
// raftLock_ has been acquired, so the third parameter is false as well.
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), false, false);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
stats::StatsManager::addValue(kCommitLogLatencyUs, execTime_);
VLOG(4) << idStr_ << "Follower succeeded committing log " << committedLogId_ + 1 << " to "
<< lastLogIdCanCommit;
CHECK_EQ(lastLogIdCanCommit, lastCommitId);
Expand Down Expand Up @@ -2009,7 +2008,6 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED;
return;
}
stats::StatsManager::addValue(kCommitSnapshotLatencyUs, execTime_);
lastTotalCount_ += std::get<1>(ret);
lastTotalSize_ += std::get<2>(ret);
if (lastTotalCount_ != req.get_total_count() || lastTotalSize_ != req.get_total_size()) {
Expand Down
12 changes: 0 additions & 12 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,15 +398,6 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
*/
void reset();

/**
* @brief Execution time of some operation, for statistics
*
* @return uint64_t Time in us
*/
uint64_t execTime() const {
return execTime_;
}

protected:
/**
* @brief Construct a new RaftPart
Expand Down Expand Up @@ -898,9 +889,6 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
int64_t startTimeMs_ = 0;

std::atomic<bool> blocking_{false};

// For stats info
uint64_t execTime_{0};
};

} // namespace raftex
Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/AdminProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TransLeaderProcessor : public BaseProcessor<cpp2::AdminExecResp> {
onFinished();
return;
} else if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
stats::StatsManager::addValue(kTransferLeaderLatencyUs, part->execTime());
// stats::StatsManager::addValue(kTransferLeaderLatencyUs, part->execTime());
// To avoid dead lock, we use another ioThreadPool to check the
// leader information.
folly::via(folly::getGlobalIOExecutor().get(), [this, part, spaceId, partId] {
Expand Down

0 comments on commit a54050c

Please sign in to comment.