From 31ccad939b978dc16c793c89963a3a1a62182a35 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 2 Apr 2021 18:58:22 +0800 Subject: [PATCH 1/7] heartbeat refactor --- src/kvstore/raftex/Host.cpp | 71 +++++++- src/kvstore/raftex/Host.h | 12 ++ src/kvstore/raftex/RaftPart.cpp | 169 ++++++++++++++++-- src/kvstore/raftex/RaftPart.h | 21 ++- src/kvstore/raftex/RaftexService.cpp | 15 ++ src/kvstore/raftex/RaftexService.h | 4 + .../raftex/test/LeaderTransferTest.cpp | 18 +- 7 files changed, 272 insertions(+), 38 deletions(-) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 98560fcce..8a0c2df90 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -151,11 +151,6 @@ folly::Future Host::appendLogs( << ", lastLogIdSent = " << lastLogIdSent_ << ", lastLogTermSent = " << lastLogTermSent_; } - if (prevLogTerm < lastLogTermSent_ || prevLogId < lastLogIdSent_) { - LOG(INFO) << idStr_ << "We have sended this log, so go on from id " << lastLogIdSent_ - << ", term " << lastLogTermSent_ << "; current prev log id " << prevLogId - << ", current prev log term " << prevLogTerm; - } logTermToSend_ = term; logIdToSend_ = logId; committedLogId_ = committedLogId; @@ -506,6 +501,72 @@ folly::Future Host::sendAppendLogRequest( return client->future_appendLog(*req); } +folly::Future Host::sendHeartbeat(folly::EventBase* eb, + TermID term, + LogID latestLogId, + LogID commitLogId, + TermID lastLogTerm, + LogID lastLogId) { + auto req = std::make_shared(); + req->set_space(part_->spaceId()); + req->set_part(part_->partitionId()); + req->set_current_term(term); + req->set_last_log_id(latestLogId); + req->set_committed_log_id(commitLogId); + req->set_leader_addr(part_->address().host); + req->set_leader_port(part_->address().port); + req->set_last_log_term_sent(lastLogTerm); + req->set_last_log_id_sent(lastLogId); + folly::Promise promise; + sendHeartbeatRequest(eb, std::move(req)) + .via(eb) + .then([self = shared_from_this(), pro = std::move(promise)] + (folly::Try&& t) mutable { + VLOG(3) << self->idStr_ << "heartbeat call got response"; + if (t.hasException()) { + cpp2::HeartbeatResponse resp; + resp.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + pro.setValue(std::move(resp)); + return; + } else { + pro.setValue(std::move(t.value())); + } + }); + return promise.getFuture(); +} + +folly::Future Host::sendHeartbeatRequest( + folly::EventBase* eb, + std::shared_ptr req) { + VLOG(2) << idStr_ << "Entering Host::sendHeartbeatRequest()"; + + { + std::lock_guard g(lock_); + auto res = checkStatus(); + if (res != cpp2::ErrorCode::SUCCEEDED) { + LOG(WARNING) << idStr_ + << "The Host is not in a proper status, do not send"; + cpp2::HeartbeatResponse resp; + resp.set_error_code(res); + return resp; + } + } + + if (FLAGS_trace_raft) { + LOG(INFO) << idStr_ + << "Sending heartbeat space " << req->get_space() + << ", part " << req->get_part() + << ", current term " << req->get_current_term() + << ", last_log_id " << req->get_last_log_id() + << ", committed_id " << req->get_committed_log_id() + << ", last_log_term_sent " << req->get_last_log_term_sent() + << ", last_log_id_sent " << req->get_last_log_id_sent(); + } + // Get client connection + auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); + return client->future_heartbeat(*req); +} + bool Host::noRequest() const { CHECK(!lock_.try_lock()); static auto emptyTup = std::make_tuple(0, 0, 0); diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 51a2a45ff..87ebc569a 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -88,6 +88,14 @@ class Host final : public std::enable_shared_from_this { TermID lastLogTermSent, // The last log term being sent LogID lastLogIdSent); // The last log id being sent + folly::Future sendHeartbeat( + folly::EventBase* eb, + TermID term, + LogID latestLogId, + LogID commitLogId, + TermID lastLogTerm, + LogID lastLogId); + const HostAddr& address() const { return addr_; } @@ -103,6 +111,10 @@ class Host final : public std::enable_shared_from_this { folly::EventBase* eb, std::shared_ptr req); + folly::Future sendHeartbeatRequest( + folly::EventBase* eb, + std::shared_ptr req); + std::shared_ptr prepareAppendLogRequest(); bool noRequest() const; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index ba23f6dfa..10986dfaa 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -945,6 +945,9 @@ void RaftPart::processAppendLogResponses( if (commitLogs(std::move(walIt))) { committedLogId_ = lastLogId; firstLogId = lastLogId_ + 1; + lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); + lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec(); + commitInThisTerm_ = true; } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } @@ -1029,10 +1032,7 @@ void RaftPart::processAppendLogResponses( bool RaftPart::needToSendHeartbeat() { std::lock_guard g(raftLock_); - return status_ == Status::RUNNING && - role_ == Role::LEADER && - time::WallClock::fastNowInMilliSec() - lastMsgAcceptedTime_ >= - FLAGS_raft_heartbeat_interval_secs * 1000 * 2 / 5; + return status_ == Status::RUNNING && role_ == Role::LEADER; } @@ -1262,8 +1262,9 @@ bool RaftPart::leaderElection() { }); lastMsgAcceptedTime_ = 0; } + weight_ = 1; + commitInThisTerm_ = false; } - weight_ = 1; sendHeartbeat(); return true; } @@ -1531,7 +1532,7 @@ void RaftPart::processAppendLogRequest( return; } // Check leadership - cpp2::ErrorCode err = verifyLeader(req); + cpp2::ErrorCode err = verifyLeader(req); if (err != cpp2::ErrorCode::SUCCEEDED) { // Wrong leadership VLOG(2) << idStr_ << "Will not follow the leader"; @@ -1680,35 +1681,45 @@ void RaftPart::processAppendLogRequest( return; } - if (req.get_committed_log_id() > committedLogId_) { + LogID lastLogIdCanCommit = std::min(lastLogId_, req.get_committed_log_id()); + if (lastLogIdCanCommit > committedLogId_) { // Commit some logs // We can only commit logs from firstId to min(lastLogId_, leader's commit log id), // follower can't always commit to leader's commit id because of lack of log - LogID lastLogIdCanCommit = std::min(lastLogId_, req.get_committed_log_id()); - CHECK_LE(committedLogId_ + 1, lastLogIdCanCommit); - if (commitLogs(wal_->iterator(committedLogId_ + 1, lastLogIdCanCommit))) { + auto code = commitLogs(wal_->iterator(committedLogId_ + 1, lastLogIdCanCommit), false); + if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(1) << idStr_ << "Follower succeeded committing log " << committedLogId_ + 1 << " to " << lastLogIdCanCommit; committedLogId_ = lastLogIdCanCommit; resp.set_committed_log_id(lastLogIdCanCommit); + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + } else if (code == nebula::cpp2::ErrorCode::E_WRITE_STALLED) { + VLOG(1) << idStr_ << "Follower delay committing log " + << committedLogId_ + 1 << " to " + << lastLogIdCanCommit; + // Even if log is not applied to state machine, still regard as succeded: + // 1. As a follower, upcoming request will try to commit them + // 2. If it is elected as leader later, it will try to commit them as well + resp.set_committed_log_id(committedLogId_); + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); } else { LOG(ERROR) << idStr_ << "Failed to commit log " << committedLogId_ + 1 << " to " << req.get_committed_log_id(); resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); - return; } + } else { + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); } // Reset the timeout timer again in case wal and commit takes longer time than expected lastMsgRecvDur_.reset(); - resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); } -cpp2::ErrorCode RaftPart::verifyLeader( - const cpp2::AppendLogRequest& req) { +template +cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { CHECK(!raftLock_.try_lock()); auto candidate = HostAddr(req.get_leader_addr(), req.get_leader_port()); auto code = checkPeer(candidate); @@ -1792,6 +1803,64 @@ cpp2::ErrorCode RaftPart::verifyLeader( return cpp2::ErrorCode::SUCCEEDED; } +void RaftPart::processHeartbeatRequest( + const cpp2::HeartbeatRequest& req, + cpp2::HeartbeatResponse& resp) { + if (FLAGS_trace_raft) { + LOG(INFO) << idStr_ + << "Received heartbeat " + << ": GraphSpaceId = " << req.get_space() + << ", partition = " << req.get_part() + << ", leaderIp = " << req.get_leader_addr() + << ", leaderPort = " << req.get_leader_port() + << ", current_term = " << req.get_current_term() + << ", lastLogId = " << req.get_last_log_id() + << ", committedLogId = " << req.get_committed_log_id() + << ", lastLogIdSent = " << req.get_last_log_id_sent() + << ", lastLogTermSent = " << req.get_last_log_term_sent() + << ", local lastLogId = " << lastLogId_ + << ", local lastLogTerm = " << lastLogTerm_ + << ", local committedLogId = " << committedLogId_ + << ", local current term = " << term_; + } + std::lock_guard g(raftLock_); + + resp.set_current_term(term_); + resp.set_leader_addr(leader_.host); + resp.set_leader_port(leader_.port); + resp.set_committed_log_id(committedLogId_); + resp.set_last_log_id(lastLogId_ < committedLogId_ ? committedLogId_ : lastLogId_); + resp.set_last_log_term(lastLogTerm_); + + // Check status + if (UNLIKELY(status_ == Status::STOPPED)) { + VLOG(2) << idStr_ + << "The part has been stopped, skip the request"; + resp.set_error_code(cpp2::ErrorCode::E_BAD_STATE); + return; + } + if (UNLIKELY(status_ == Status::STARTING)) { + VLOG(2) << idStr_ << "The partition is still starting"; + resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); + return; + } + // Check leadership + cpp2::ErrorCode err = verifyLeader(req); + if (err != cpp2::ErrorCode::SUCCEEDED) { + // Wrong leadership + VLOG(2) << idStr_ << "Will not follow the leader"; + resp.set_error_code(err); + return; + } + + // Reset the timeout timer + lastMsgRecvDur_.reset(); + + // As for heartbeat, return ok after verifyLeader + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return; +} + void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, cpp2::SendSnapshotResponse& resp) { VLOG(1) << idStr_ << "Receive snapshot, total rows " << req.get_rows().size() @@ -1863,10 +1932,73 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, return; } -folly::Future RaftPart::sendHeartbeat() { +void RaftPart::sendHeartbeat() { + // If leader has not commit any logs in this term, it must commit all logs in previous term, + // so heartbeat is send by appending one empty log. + if (!replicatingLogs_.load(std::memory_order_acquire)) { + folly::via(executor_.get(), [this] { + std::string log = ""; + appendLogAsync(clusterId_, LogType::NORMAL, std::move(log)); + }); + } + + using namespace folly; // NOLINT since the fancy overload of | operator VLOG(2) << idStr_ << "Send heartbeat"; - std::string log = ""; - return appendLogAsync(clusterId_, LogType::NORMAL, std::move(log)); + TermID currTerm = 0; + LogID latestLogId = 0; + LogID commitLogId = 0; + TermID prevLogTerm = 0; + LogID prevLogId = 0; + size_t replica = 0; + decltype(hosts_) hosts; + { + std::lock_guard g(raftLock_); + currTerm = term_; + latestLogId = wal_->lastLogId(); + commitLogId = committedLogId_; + prevLogTerm = lastLogTerm_; + prevLogId = lastLogId_; + replica = quorum_; + hosts = hosts_; + } + auto eb = ioThreadPool_->getEventBase(); + auto startMs = time::WallClock::fastNowInMilliSec(); + collectNSucceeded( + gen::from(hosts) + | gen::map([self = shared_from_this(), eb, currTerm, latestLogId, commitLogId, + prevLogId, prevLogTerm] (std::shared_ptr hostPtr) { + VLOG(2) << self->idStr_ << "Send heartbeat to " << hostPtr->idStr(); + return via(eb, [=]() -> Future { + return hostPtr->sendHeartbeat( + eb, currTerm, latestLogId, commitLogId, prevLogTerm, prevLogId); + }); + }) + | gen::as(), + // Number of succeeded required + hosts.size(), + // Result evaluator + [hosts] (size_t index, cpp2::HeartbeatResponse& resp) { + return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED + && !hosts[index]->isLearner(); + }) + .then([replica, hosts = std::move(hosts), startMs, this] + (folly::Try&& resps) { + CHECK(!resps.hasException()); + size_t numSucceeded = 0; + for (auto& resp : *resps) { + if (!hosts[resp.first]->isLearner() + && resp.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + ++numSucceeded; + } + } + if (numSucceeded >= replica) { + VLOG(2) << idStr_ << "Heartbeat is accepted by quorum"; + std::lock_guard g(raftLock_); + auto now = time::WallClock::fastNowInMilliSec(); + lastMsgAcceptedCostMs_ = now - startMs; + lastMsgAcceptedTime_ = now; + } + }); } std::vector> RaftPart::followers() const { @@ -1998,6 +2130,9 @@ bool RaftPart::leaseValid() { if (hosts_.empty()) { return true; } + if (!commitInThisTerm_) { + return false; + } // When majority has accepted a log, leader obtains a lease which last for heartbeat. // However, we need to take off the net io time. On the left side of the inequality is // the time duration since last time leader send a log (the log has been accepted as well) diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 377ed0827..fd1affdad 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -224,6 +224,10 @@ class RaftPart : public std::enable_shared_from_this { const cpp2::SendSnapshotRequest& req, cpp2::SendSnapshotResponse& resp); + void processHeartbeatRequest( + const cpp2::HeartbeatRequest& req, + cpp2::HeartbeatResponse& resp); + bool leaseValid(); bool needToCleanWal(); @@ -317,12 +321,10 @@ class RaftPart : public std::enable_shared_from_this { private: // A list of // idx -- the index of the peer - // resp -- AskForVoteResponse + // resp -- coresponding response of peer[index] using ElectionResponses = std::vector>; - // A list of - // idx -- the index of the peer - // resp -- AppendLogResponse using AppendLogResponses = std::vector>; + using HeartbeatResponses = std::vector>; // using LogCache = std::vector< @@ -339,13 +341,15 @@ class RaftPart : public std::enable_shared_from_this { ***************************************************/ const char* roleStr(Role role) const; - cpp2::ErrorCode verifyLeader(const cpp2::AppendLogRequest& req); + template + cpp2::ErrorCode verifyLeader(const REQ& req); /***************************************************************** - * Asynchronously send a heartbeat (An empty log entry) + * + * Asynchronously send a heartbeat * ****************************************************************/ - folly::Future sendHeartbeat(); + void sendHeartbeat(); /**************************************************** * @@ -566,6 +570,9 @@ class RaftPart : public std::enable_shared_from_this { std::atomic_bool inElection_{false}; // Speed up first election when I don't know who is leader bool isBlindFollower_{true}; + // Check leader has commit log in this term (accepted by majority is not enough), + // leader is not allowed to service until it is true. + bool commitInThisTerm_{false}; // Write-ahead Log std::shared_ptr wal_; diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index 2cbf1e296..e25f2d884 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -228,6 +228,21 @@ void RaftexService::sendSnapshot( part->processSendSnapshotRequest(req, resp); } +void RaftexService::async_eb_heartbeat( + std::unique_ptr> callback, + const cpp2::HeartbeatRequest& req) { + cpp2::HeartbeatResponse resp; + auto part = findPart(req.get_space(), req.get_part()); + if (!part) { + // Not found + resp.set_error_code(cpp2::ErrorCode::E_UNKNOWN_PART); + callback->result(resp); + return; + } + part->processHeartbeatRequest(req, resp); + callback->result(resp); +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index aa7606db7..88b98cec3 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -49,6 +49,10 @@ class RaftexService : public cpp2::RaftexServiceSvIf { cpp2::SendSnapshotResponse& resp, const cpp2::SendSnapshotRequest& req) override; + void async_eb_heartbeat( + std::unique_ptr> callback, + const cpp2::HeartbeatRequest& req) override; + void addPartition(std::shared_ptr part); void removePartition(std::shared_ptr part); diff --git a/src/kvstore/raftex/test/LeaderTransferTest.cpp b/src/kvstore/raftex/test/LeaderTransferTest.cpp index 5674d0f4e..83cf2635d 100644 --- a/src/kvstore/raftex/test/LeaderTransferTest.cpp +++ b/src/kvstore/raftex/test/LeaderTransferTest.cpp @@ -15,14 +15,14 @@ #include "kvstore/raftex/test/RaftexTestBase.h" #include "kvstore/raftex/test/TestShard.h" -DECLARE_uint32(heartbeat_interval); - +DECLARE_uint32(raft_heartbeat_interval_secs); namespace nebula { namespace raftex { TEST(LeaderTransferTest, SimpleTest) { + FLAGS_raft_heartbeat_interval_secs = 1; fs::TempDir walRoot("/tmp/leader_transfer_test.simple_test.XXXXXX"); std::shared_ptr workers; std::vector wals; @@ -69,6 +69,8 @@ TEST(LeaderTransferTest, ChangeLeaderServalTimesTest) { // Check all hosts agree on the same leader auto nLeaderIndex = checkLeadership(copies, leader); int32_t times = 0; + int32_t logIndex = 0; + std::vector msgs; while (++times <= 10) { auto leaderIndex = nLeaderIndex; nLeaderIndex = (nLeaderIndex + 1) % 3; @@ -80,14 +82,12 @@ TEST(LeaderTransferTest, ChangeLeaderServalTimesTest) { f.wait(); waitUntilLeaderElected(copies, leader); checkLeadership(copies, nLeaderIndex, leader); + LOG(INFO) << "=====> Start appending logs"; + appendLogs(logIndex, logIndex, leader, msgs, true); + checkConsensus(copies, 0, logIndex, msgs); + LOG(INFO) << "<===== Finish appending logs"; + logIndex++; } - - // Append 100 logs - LOG(INFO) << "=====> Start appending logs"; - std::vector msgs; - appendLogs(0, 99, leader, msgs); - LOG(INFO) << "<===== Finish appending logs"; - checkConsensus(copies, 0, 99, msgs); finishRaft(services, copies, workers, leader); } From c7b37757ce4e11ecba78d47b41bb272b381bba21 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 15 Jan 2021 19:11:29 +0800 Subject: [PATCH 2/7] don not hold raft lock when commit --- src/kvstore/raftex/RaftPart.cpp | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 10986dfaa..ee1523559 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -420,7 +420,7 @@ void RaftPart::preProcessTransLeader(const HostAddr& target) { } void RaftPart::commitTransLeader(const HostAddr& target) { - CHECK(!raftLock_.try_lock()); + bool needToUnlock = raftLock_.try_lock(); LOG(INFO) << idStr_ << "Commit transfer leader to " << target; switch (role_) { case Role::LEADER: { @@ -449,6 +449,9 @@ void RaftPart::commitTransLeader(const HostAddr& target) { break; } } + if (needToUnlock) { + raftLock_.unlock(); + } } void RaftPart::updateQuorum() { @@ -577,7 +580,12 @@ void RaftPart::preProcessRemovePeer(const HostAddr& peer) { } void RaftPart::commitRemovePeer(const HostAddr& peer) { - CHECK(!raftLock_.try_lock()); + bool needToUnlock = raftLock_.try_lock(); + SCOPE_EXIT { + if (needToUnlock) { + raftLock_.unlock(); + } + }; if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) { LOG(INFO) << idStr_ << "I am " << roleStr(role_) << ", skip remove peer in commit"; @@ -938,11 +946,19 @@ void RaftPart::processAppendLogResponses( } lastLogId_ = lastLogId; lastLogTerm_ = currTerm; + } while (false); + + if (!checkAppendLogResult(res)) { + LOG(ERROR) << idStr_ << "processAppendLogResponses failed!"; + return; + } + { auto walIt = wal_->iterator(committedId + 1, lastLogId); SlowOpTracker tracker; // Step 3: Commit the batch if (commitLogs(std::move(walIt))) { + std::lock_guard g(raftLock_); committedLogId_ = lastLogId; firstLogId = lastLogId_ + 1; lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); @@ -957,15 +973,8 @@ void RaftPart::processAppendLogResponses( } VLOG(2) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to " << lastLogId; - - lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); - lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec(); - } while (false); - - if (!checkAppendLogResult(res)) { - LOG(ERROR) << idStr_ << "processAppendLogResponses failed!"; - return; } + // Step 4: Fulfill the promise if (iter.hasNonAtomicOpLogs()) { sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED); @@ -2041,7 +2050,7 @@ bool RaftPart::checkAppendLogResult(AppendLogResult res) { } sendingPromise_.setValue(res); replicatingLogs_ = false; - return false;; + return false; } return true; } From 0a2a0962618b526c670a2a1eb2cb2b688584e276 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 2 Apr 2021 16:35:03 +0800 Subject: [PATCH 3/7] follower delay commit if write stall --- src/kvstore/KVEngine.h | 8 ++-- src/kvstore/Listener.cpp | 4 +- src/kvstore/Listener.h | 4 +- src/kvstore/Part.cpp | 53 ++++++++++++++++++--------- src/kvstore/Part.h | 12 +----- src/kvstore/RocksEngine.cpp | 6 ++- src/kvstore/RocksEngine.h | 3 +- src/kvstore/raftex/RaftPart.cpp | 2 +- src/kvstore/raftex/RaftPart.h | 3 +- src/kvstore/raftex/test/TestShard.cpp | 4 +- src/kvstore/raftex/test/TestShard.h | 2 +- 11 files changed, 58 insertions(+), 43 deletions(-) diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 433b6d8c1..832b18df5 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -47,10 +47,10 @@ class KVEngine { virtual std::unique_ptr startBatchWrite() = 0; - virtual nebula::cpp2::ErrorCode - commitBatchWrite(std::unique_ptr batch, - bool disableWAL = true, - bool sync = false) = 0; + virtual nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr batch, + bool disableWAL, + bool sync, + bool wait) = 0; // Read a single key virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 2b384ff69..2855ab20f 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -113,7 +113,7 @@ bool Listener::preProcessLog(LogID logId, return true; } -bool Listener::commitLogs(std::unique_ptr iter) { +cpp2::ErrorCode Listener::commitLogs(std::unique_ptr iter, bool) { LogID lastId = -1; while (iter->valid()) { lastId = iter->logId(); @@ -122,7 +122,7 @@ bool Listener::commitLogs(std::unique_ptr iter) { if (lastId > 0) { leaderCommitId_ = lastId; } - return true; + return cpp2::ErrorCode::SUCCEEDED; } void Listener::doApply() { diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 8a5e37d20..32cd4740d 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -52,7 +52,7 @@ derived class. // For listener, we just return true directly. Another background thread trigger the actual // apply work, and do it in worker thread, and update lastApplyLogId_ - bool commitLogs(std::unique_ptr iter) + cpp2::Errorcode commitLogs(std::unique_ptr iter, bool) // For most of the listeners, just return true is enough. However, if listener need to be aware // of membership change, some log type of wal need to be pre-processed, could do it here. @@ -154,7 +154,7 @@ class Listener : public raftex::RaftPart { // For listener, we just return true directly. Another background thread trigger the actual // apply work, and do it in worker thread, and update lastApplyLogId_ - bool commitLogs(std::unique_ptr) override; + cpp2::ErrorCode commitLogs(std::unique_ptr, bool) override; // For most of the listeners, just return true is enough. However, if listener need to be aware // of membership change, some log type of wal need to be pre-processed, could do it here. diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 704b31b34..7d1671f6b 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -195,7 +195,7 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) { } } -bool Part::commitLogs(std::unique_ptr iter) { +cpp2::ErrorCode Part::commitLogs(std::unique_ptr iter, bool wait) { auto batch = engine_->startBatchWrite(); LogID lastId = -1; TermID lastTerm = -1; @@ -214,9 +214,10 @@ bool Part::commitLogs(std::unique_ptr iter) { case OP_PUT: { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); - if (batch->put(pieces[0], pieces[1]) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = batch->put(pieces[0], pieces[1]); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; - return false; + return code; } break; } @@ -225,27 +226,30 @@ bool Part::commitLogs(std::unique_ptr iter) { // Make the number of values are an even number DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); for (size_t i = 0; i < kvs.size(); i += 2) { - if (batch->put(kvs[i], kvs[i + 1]) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = batch->put(kvs[i], kvs[i + 1]); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; - return false; + return code; } } break; } case OP_REMOVE: { auto key = decodeSingleValue(log); - if (batch->remove(key) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = batch->remove(key); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()"; - return false; + return code; } break; } case OP_MULTI_REMOVE: { auto keys = decodeMultiValues(log); for (auto k : keys) { - if (batch->remove(k) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = batch->remove(k); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()"; - return false; + return code; } } break; @@ -253,9 +257,10 @@ bool Part::commitLogs(std::unique_ptr iter) { case OP_REMOVE_RANGE: { auto range = decodeMultiValues(log); DCHECK_EQ(2, range.size()); - if (batch->removeRange(range[0], range[1]) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = batch->removeRange(range[0], range[1]); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removeRange()"; - return false; + return code; } break; } @@ -272,7 +277,7 @@ bool Part::commitLogs(std::unique_ptr iter) { } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch"; - return false; + return code; } } break; @@ -314,14 +319,14 @@ bool Part::commitLogs(std::unique_ptr iter) { } if (lastId >= 0) { - if (putCommitMsg(batch.get(), lastId, lastTerm) != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto code = putCommitMsg(batch.get(), lastId, lastTerm); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Commit msg failed"; - return false; + return code; } } - return engine_->commitBatchWrite(std::move(batch), - FLAGS_rocksdb_disable_wal, - FLAGS_rocksdb_wal_sync) == nebula::cpp2::ErrorCode::SUCCEEDED; + return engine_->commitBatchWrite( + std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, wait); } std::pair Part::commitSnapshot(const std::vector& rows, @@ -348,7 +353,9 @@ std::pair Part::commitSnapshot(const std::vector& } } // For snapshot, we open the rocksdb's wal to avoid loss data if crash. - if (nebula::cpp2::ErrorCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch), false)) { + auto code = engine_->commitBatchWrite( + std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Put failed in commit"; return std::make_pair(0, 0); } @@ -433,6 +440,16 @@ bool Part::preProcessLog(LogID logId, return true; } +void Part::cleanup() { + LOG(INFO) << idStr_ << "Clean rocksdb commit key"; + auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_)); + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error " + << static_cast(res); + } + return; +} + // TODO(pandasheep) unify raft errorcode nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) { switch (res) { diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 21816fc86..a0ec25944 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -95,7 +95,7 @@ class Part : public raftex::RaftPart { void onDiscoverNewLeader(HostAddr nLeader) override; - bool commitLogs(std::unique_ptr iter) override; + cpp2::ErrorCode commitLogs(std::unique_ptr iter, bool wait) override; bool preProcessLog(LogID logId, TermID termId, @@ -110,15 +110,7 @@ class Part : public raftex::RaftPart { nebula::cpp2::ErrorCode putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm); - void cleanup() override { - LOG(INFO) << idStr_ << "Clean rocksdb commit key"; - auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_)); - if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error " - << static_cast(res); - } - return; - } + void cleanup() override; nebula::cpp2::ErrorCode toResultCode(raftex::AppendLogResult res); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index f96cbd219..e35b6c299 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -137,14 +137,18 @@ std::unique_ptr RocksEngine::startBatchWrite() { nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr batch, bool disableWAL, - bool sync) { + bool sync, + bool wait) { rocksdb::WriteOptions options; options.disableWAL = disableWAL; options.sync = sync; + options.no_slowdown = !wait; auto* b = static_cast(batch.get()); rocksdb::Status status = db_->Write(options, b->data()); if (status.ok()) { return nebula::cpp2::ErrorCode::SUCCEEDED; + } else if (!wait && status.IsIncomplete()) { + return nebula::cpp2::ErrorCode::E_WRITE_STALLED; } LOG(ERROR) << "Write into rocksdb failed because of " << status.ToString(); return nebula::cpp2::ErrorCode::E_UNKNOWN; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 3bb497ec1..31d420aee 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -115,7 +115,8 @@ class RocksEngine : public KVEngine { nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr batch, bool disableWAL, - bool sync) override; + bool sync, + bool wait) override; /********************* * Data retrieval diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index ee1523559..f3ba7db6a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -957,7 +957,7 @@ void RaftPart::processAppendLogResponses( auto walIt = wal_->iterator(committedId + 1, lastLogId); SlowOpTracker tracker; // Step 3: Commit the batch - if (commitLogs(std::move(walIt))) { + if (commitLogs(std::move(walIt), true) == nebula::cpp2::ErrorCode::SUCCEEDED) { std::lock_guard g(raftLock_); committedLogId_ = lastLogId; firstLogId = lastLogId_ + 1; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index fd1affdad..081274a90 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -12,6 +12,7 @@ #include "common/interface/gen-cpp2/RaftexServiceAsyncClient.h" #include "common/time/Duration.h" #include "common/thread/GenericThreadPool.h" +#include "kvstore/Common.h" #include "kvstore/raftex/SnapshotManager.h" #include "kvstore/DiskManager.h" #include @@ -298,7 +299,7 @@ class RaftPart : public std::enable_shared_from_this { // The inherited classes need to implement this method to commit // a batch of log messages - virtual bool commitLogs(std::unique_ptr iter) = 0; + virtual nebula::cpp2::ErrorCode commitLogs(std::unique_ptr iter, bool wait) = 0; virtual bool preProcessLog(LogID logId, TermID termId, diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 5056deb01..9c8ec8633 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -165,7 +165,7 @@ void TestShard::onElected(TermID term) { } -bool TestShard::commitLogs(std::unique_ptr iter) { +nebula::cpp2::ErrorCode TestShard::commitLogs(std::unique_ptr iter, bool) { LogID firstId = -1; LogID lastId = -1; int32_t commitLogsNum = 0; @@ -211,7 +211,7 @@ bool TestShard::commitLogs(std::unique_ptr iter) { if (commitLogsNum > 0) { commitTimes_++; } - return true; + return nebula::cpp2::ErrorCode::SUCCEEDED; } std::pair TestShard::commitSnapshot(const std::vector& data, diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index bb484efea..21ec6e106 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -80,7 +80,7 @@ class TestShard : public RaftPart { void onElected(TermID term) override; void onDiscoverNewLeader(HostAddr) override {} - bool commitLogs(std::unique_ptr iter) override; + nebula::cpp2::ErrorCode commitLogs(std::unique_ptr iter, bool wait) override; bool preProcessLog(LogID, TermID, From e3102f4f3ece5c0eef7361ccdf882ff475512476 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 13 Apr 2021 13:31:34 +0800 Subject: [PATCH 4/7] set rocksdb option in store worker --- src/kvstore/NebulaStore.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 7e4eeac1d..68cde6edf 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -533,15 +533,17 @@ void NebulaStore::checkRemoteListeners(GraphSpaceID spaceId, void NebulaStore::updateSpaceOption(GraphSpaceID spaceId, const std::unordered_map& options, bool isDbOption) { - if (isDbOption) { - for (const auto& kv : options) { - setDBOption(spaceId, kv.first, kv.second); - } - } else { - for (const auto& kv : options) { - setOption(spaceId, kv.first, kv.second); + storeWorker_->addTask([this, spaceId, opts = options, isDbOption] { + if (isDbOption) { + for (const auto& kv : opts) { + setDBOption(spaceId, kv.first, kv.second); + } + } else { + for (const auto& kv : opts) { + setOption(spaceId, kv.first, kv.second); + } } - } + }); } void NebulaStore::removeSpaceDir(const std::string& dir) { From 0243f066bbe469dec7a489a38009c0dc861878ac Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 13 Apr 2021 19:32:09 +0800 Subject: [PATCH 5/7] sort out trace_raft log --- src/kvstore/Part.cpp | 1 - src/kvstore/raftex/Host.cpp | 68 +++++++++++++------------------ src/kvstore/raftex/RaftPart.cpp | 72 +++++++++++++++------------------ 3 files changed, 61 insertions(+), 80 deletions(-) diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 7d1671f6b..0619a29b3 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -462,7 +462,6 @@ nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) { case raftex::AppendLogResult::E_ATOMIC_OP_FAILURE: return nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED; case raftex::AppendLogResult::E_BUFFER_OVERFLOW: - LOG_EVERY_N(ERROR, 100) << idStr_ << "RaftPart buffer is full"; return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR; default: LOG(ERROR) << idStr_ << "Consensus error " diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 8a0c2df90..9195a16ed 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -96,15 +96,6 @@ folly::Future Host::appendLogs( LogID prevLogId) { VLOG(3) << idStr_ << "Entering Host::appendLogs()"; - if (FLAGS_trace_raft) { - LOG(INFO) << idStr_ - << "Append logs to the host [term = " << term - << ", logId = " << logId - << ", committedLogId = " << committedLogId - << ", lastLogTermSent = " << prevLogTerm - << ", lastLogIdSent = " << prevLogId - << "]"; - } auto ret = folly::Future::makeEmpty(); std::shared_ptr req; { @@ -198,17 +189,15 @@ void Host::appendLogsInternal(folly::EventBase* eb, } cpp2::AppendLogResponse resp = std::move(t).value(); - if (FLAGS_trace_raft) { - LOG(INFO) - << self->idStr_ << "AppendLogResponse " - << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) - << ", currTerm " << resp.get_current_term() - << ", lastLogId " << resp.get_last_log_id() - << ", lastLogTerm " << resp.get_last_log_term() - << ", commitLogId " << resp.get_committed_log_id() - << ", lastLogIdSent_ " << self->lastLogIdSent_ - << ", lastLogTermSent_ " << self->lastLogTermSent_; - } + LOG_IF(INFO, FLAGS_trace_raft) + << self->idStr_ << "AppendLogResponse " + << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) + << ", currTerm " << resp.get_current_term() + << ", lastLogId " << resp.get_last_log_id() + << ", lastLogTerm " << resp.get_last_log_term() + << ", commitLogId " << resp.get_committed_log_id() + << ", lastLogIdSent_ " << self->lastLogIdSent_ + << ", lastLogTermSent_ " << self->lastLogTermSent_; switch (resp.get_error_code()) { case cpp2::ErrorCode::SUCCEEDED: { VLOG(2) << self->idStr_ @@ -226,7 +215,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, r.set_error_code(res); self->setResponse(r); } else if (self->lastLogIdSent_ >= resp.get_last_log_id()) { - VLOG(1) << self->idStr_ + VLOG(2) << self->idStr_ << "We send nothing in the last request" << ", so we don't send the same logs again"; self->followerCommittedLogId_ = resp.get_committed_log_id(); @@ -291,7 +280,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, r.set_error_code(res); self->setResponse(r); } else if (self->lastLogIdSent_ == resp.get_last_log_id()) { - VLOG(1) << self->idStr_ + VLOG(2) << self->idStr_ << "We send nothing in the last request" << ", so we don't send the same logs again"; self->lastLogIdSent_ = resp.get_last_log_id(); @@ -359,7 +348,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, r.set_error_code(res); self->setResponse(r); } else if (self->logIdToSend_ <= resp.get_last_log_id()) { - VLOG(1) << self->idStr_ + VLOG(2) << self->idStr_ << "It means the request has been received by follower"; self->lastLogIdSent_ = self->logIdToSend_ - 1; self->lastLogTermSent_ = resp.get_last_log_term(); @@ -489,13 +478,14 @@ folly::Future Host::sendAppendLogRequest( } } - VLOG(1) << idStr_ << "Sending request space " << req->get_space() - << ", part " << req->get_part() - << ", current term " << req->get_current_term() - << ", last_log_id " << req->get_last_log_id() - << ", committed_id " << req->get_committed_log_id() - << ", last_log_term_sent" << req->get_last_log_term_sent() - << ", last_log_id_sent " << req->get_last_log_id_sent(); + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ + << "Sending appendLog: space " << req->get_space() + << ", part " << req->get_part() + << ", current term " << req->get_current_term() + << ", last_log_id " << req->get_last_log_id() + << ", committed_id " << req->get_committed_log_id() + << ", last_log_term_sent" << req->get_last_log_term_sent() + << ", last_log_id_sent " << req->get_last_log_id_sent(); // Get client connection auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); return client->future_appendLog(*req); @@ -552,16 +542,14 @@ folly::Future Host::sendHeartbeatRequest( } } - if (FLAGS_trace_raft) { - LOG(INFO) << idStr_ - << "Sending heartbeat space " << req->get_space() - << ", part " << req->get_part() - << ", current term " << req->get_current_term() - << ", last_log_id " << req->get_last_log_id() - << ", committed_id " << req->get_committed_log_id() - << ", last_log_term_sent " << req->get_last_log_term_sent() - << ", last_log_id_sent " << req->get_last_log_id_sent(); - } + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ + << "Sending heartbeat: space " << req->get_space() + << ", part " << req->get_part() + << ", current term " << req->get_current_term() + << ", last_log_id " << req->get_last_log_id() + << ", committed_id " << req->get_committed_log_id() + << ", last_log_term_sent " << req->get_last_log_term_sent() + << ", last_log_id_sent " << req->get_last_log_id_sent(); // Get client connection auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); return client->future_heartbeat(*req); diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index f3ba7db6a..20c110280 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1497,28 +1497,24 @@ void RaftPart::processAskForVoteRequest( void RaftPart::processAppendLogRequest( const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp) { - if (FLAGS_trace_raft) { - LOG(INFO) << idStr_ - << "Received logAppend " - << ": GraphSpaceId = " << req.get_space() - << ", partition = " << req.get_part() - << ", leaderIp = " << req.get_leader_addr() - << ", leaderPort = " << req.get_leader_port() - << ", current_term = " << req.get_current_term() - << ", lastLogId = " << req.get_last_log_id() - << ", committedLogId = " << req.get_committed_log_id() - << ", lastLogIdSent = " << req.get_last_log_id_sent() - << ", lastLogTermSent = " << req.get_last_log_term_sent() - << folly::stringPrintf( - ", num_logs = %ld, logTerm = %ld", - req.get_log_str_list().size(), - req.get_log_term()) - << ", sendingSnapshot = " << req.get_sending_snapshot() - << ", local lastLogId = " << lastLogId_ - << ", local lastLogTerm = " << lastLogTerm_ - << ", local committedLogId = " << committedLogId_ - << ", local current term = " << term_; - } + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ + << "Received logAppend" + << ": GraphSpaceId = " << req.get_space() + << ", partition = " << req.get_part() + << ", leaderIp = " << req.get_leader_addr() + << ", leaderPort = " << req.get_leader_port() + << ", current_term = " << req.get_current_term() + << ", lastLogId = " << req.get_last_log_id() + << ", committedLogId = " << req.get_committed_log_id() + << ", lastLogIdSent = " << req.get_last_log_id_sent() + << ", lastLogTermSent = " << req.get_last_log_term_sent() + << ", num_logs = " << req.get_log_str_list().size() + << ", logTerm = " << req.get_log_term() + << ", sendingSnapshot = " << req.get_sending_snapshot() + << ", local lastLogId = " << lastLogId_ + << ", local lastLogTerm = " << lastLogTerm_ + << ", local committedLogId = " << committedLogId_ + << ", local current term = " << term_; std::lock_guard g(raftLock_); resp.set_current_term(term_); @@ -1815,23 +1811,21 @@ cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { void RaftPart::processHeartbeatRequest( const cpp2::HeartbeatRequest& req, cpp2::HeartbeatResponse& resp) { - if (FLAGS_trace_raft) { - LOG(INFO) << idStr_ - << "Received heartbeat " - << ": GraphSpaceId = " << req.get_space() - << ", partition = " << req.get_part() - << ", leaderIp = " << req.get_leader_addr() - << ", leaderPort = " << req.get_leader_port() - << ", current_term = " << req.get_current_term() - << ", lastLogId = " << req.get_last_log_id() - << ", committedLogId = " << req.get_committed_log_id() - << ", lastLogIdSent = " << req.get_last_log_id_sent() - << ", lastLogTermSent = " << req.get_last_log_term_sent() - << ", local lastLogId = " << lastLogId_ - << ", local lastLogTerm = " << lastLogTerm_ - << ", local committedLogId = " << committedLogId_ - << ", local current term = " << term_; - } + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ + << "Received heartbeat" + << ": GraphSpaceId = " << req.get_space() + << ", partition = " << req.get_part() + << ", leaderIp = " << req.get_leader_addr() + << ", leaderPort = " << req.get_leader_port() + << ", current_term = " << req.get_current_term() + << ", lastLogId = " << req.get_last_log_id() + << ", committedLogId = " << req.get_committed_log_id() + << ", lastLogIdSent = " << req.get_last_log_id_sent() + << ", lastLogTermSent = " << req.get_last_log_term_sent() + << ", local lastLogId = " << lastLogId_ + << ", local lastLogTerm = " << lastLogTerm_ + << ", local committedLogId = " << committedLogId_ + << ", local current term = " << term_; std::lock_guard g(raftLock_); resp.set_current_term(term_); From b89e773e49eafbf404641fe3fef606afc3b364bb Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 14 Apr 2021 15:56:02 +0800 Subject: [PATCH 6/7] unify canAppendLogs --- src/kvstore/raftex/RaftPart.cpp | 84 +++++++++------------------------ src/kvstore/raftex/RaftPart.h | 4 ++ 2 files changed, 27 insertions(+), 61 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 20c110280..c1856afad 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -356,23 +356,28 @@ void RaftPart::stop() { AppendLogResult RaftPart::canAppendLogs() { - CHECK(!raftLock_.try_lock()); - if (status_ == Status::STARTING) { - LOG(ERROR) << idStr_ << "The partition is still starting"; - return AppendLogResult::E_NOT_READY; - } - if (status_ == Status::STOPPED) { - LOG(ERROR) << idStr_ << "The partition is stopped"; + DCHECK(!raftLock_.try_lock()); + if (UNLIKELY(status_ != Status::RUNNING)) { + LOG(ERROR) << idStr_ << "The partition is not running"; return AppendLogResult::E_STOPPED; } - if (role_ != Role::LEADER) { - LOG_EVERY_N(ERROR, 100) << idStr_ << "The partition is not a leader"; + if (UNLIKELY(role_ != Role::LEADER)) { + LOG_EVERY_N(WARNING, 1000) << idStr_ << "The partition is not a leader"; return AppendLogResult::E_NOT_A_LEADER; } - return AppendLogResult::SUCCEEDED; } +AppendLogResult RaftPart::canAppendLogs(TermID termId) { + DCHECK(!raftLock_.try_lock()); + if (UNLIKELY(term_ != termId)) { + VLOG(2) << idStr_ << "Term has been updated, origin " + << termId << ", new " << term_; + return AppendLogResult::E_TERM_OUT_OF_DATE; + } + return canAppendLogs(); +} + void RaftPart::addLearner(const HostAddr& addr) { CHECK(!raftLock_.try_lock()); if (addr == addr_) { @@ -694,7 +699,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, if (!checkAppendLogResult(res)) { // Mosy likely failed because the parttion is not leader - LOG_EVERY_N(ERROR, 100) << idStr_ << "Cannot append logs, clean the buffer"; + LOG_EVERY_N(WARNING, 1000) << idStr_ << "Cannot append logs, clean the buffer"; return res; } // Replicate buffered logs to all followers @@ -738,23 +743,8 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { AppendLogResult res = AppendLogResult::SUCCEEDED; do { std::lock_guard g(raftLock_); - if (status_ != Status::RUNNING) { - // The partition is not running - VLOG(2) << idStr_ << "The partition is stopped"; - res = AppendLogResult::E_STOPPED; - break; - } - - if (role_ != Role::LEADER) { - // Is not a leader any more - VLOG(2) << idStr_ << "The leader has changed"; - res = AppendLogResult::E_NOT_A_LEADER; - break; - } - if (term_ != termId) { - VLOG(2) << idStr_ << "Term has been updated, origin " - << termId << ", new " << term_; - res = AppendLogResult::E_TERM_OUT_OF_DATE; + res = canAppendLogs(termId); + if (res != AppendLogResult::SUCCEEDED) { break; } currTerm = term_; @@ -807,28 +797,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb, AppendLogResult res = AppendLogResult::SUCCEEDED; do { std::lock_guard g(raftLock_); - - if (status_ != Status::RUNNING) { - // The partition is not running - VLOG(2) << idStr_ << "The partition is stopped"; - res = AppendLogResult::E_STOPPED; - break; - } - - if (role_ != Role::LEADER) { - // Is not a leader any more - VLOG(2) << idStr_ << "The leader has changed"; - res = AppendLogResult::E_NOT_A_LEADER; + res = canAppendLogs(currTerm); + if (res != AppendLogResult::SUCCEEDED) { break; } - hosts = hosts_; - - if (term_ != currTerm) { - VLOG(2) << idStr_ << "Term has been updated, previous " - << currTerm << ", current " << term_; - currTerm = term_; - } } while (false); if (!checkAppendLogResult(res)) { @@ -929,19 +902,8 @@ void RaftPart::processAppendLogResponses( AppendLogResult res = AppendLogResult::SUCCEEDED; do { std::lock_guard g(raftLock_); - if (status_ != Status::RUNNING) { - LOG(INFO) << idStr_ << "The partition is stopped"; - res = AppendLogResult::E_STOPPED; - break; - } - if (role_ != Role::LEADER) { - LOG(INFO) << idStr_ << "The leader has changed"; - res = AppendLogResult::E_NOT_A_LEADER; - break; - } - if (currTerm != term_) { - LOG(INFO) << idStr_ << "The leader has changed, ABA problem."; - res = AppendLogResult::E_TERM_OUT_OF_DATE; + res = canAppendLogs(currTerm); + if (res != AppendLogResult::SUCCEEDED) { break; } lastLogId_ = lastLogId; @@ -1725,7 +1687,7 @@ void RaftPart::processAppendLogRequest( template cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { - CHECK(!raftLock_.try_lock()); + DCHECK(!raftLock_.try_lock()); auto candidate = HostAddr(req.get_leader_addr(), req.get_leader_port()); auto code = checkPeer(candidate); if (code != cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 081274a90..54a68f747 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -387,6 +387,10 @@ class RaftPart : public std::enable_shared_from_this { // Pre-condition: The caller needs to hold the raftLock_ AppendLogResult canAppendLogs(); + // Also check if term has changed + // Pre-condition: The caller needs to hold the raftLock_ + AppendLogResult canAppendLogs(TermID currTerm); + folly::Future appendLogAsync(ClusterID source, LogType logType, std::string log, From 5951ddbe8df5ecbf39878c33c57582c84f83b9cd Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 23 Jun 2021 11:58:44 +0800 Subject: [PATCH 7/7] address @liuyu85cn's comments --- src/kvstore/raftex/Host.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 9195a16ed..cd1612560 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -508,6 +508,7 @@ folly::Future Host::sendHeartbeat(folly::EventBase* eb, req->set_last_log_term_sent(lastLogTerm); req->set_last_log_id_sent(lastLogId); folly::Promise promise; + auto future = promise.getFuture(); sendHeartbeatRequest(eb, std::move(req)) .via(eb) .then([self = shared_from_this(), pro = std::move(promise)] @@ -522,7 +523,7 @@ folly::Future Host::sendHeartbeat(folly::EventBase* eb, pro.setValue(std::move(t.value())); } }); - return promise.getFuture(); + return future; } folly::Future Host::sendHeartbeatRequest(