diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 07f5ceabb43..0cdc2a750b9 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -88,6 +88,7 @@ class AppendLogsIterator final : public LogIterator { for (auto it = logs_.begin(); it != logs_.end(); ++it) { auto& promiseRef = std::get<4>(*it); if (!promiseRef.isFulfilled()) { + DCHECK(!promiseRef.isFulfilled()); promiseRef.setValue(code); } } @@ -145,9 +146,12 @@ class AppendLogsIterator final : public LogIterator { class AppendLogsIteratorFactory { public: AppendLogsIteratorFactory() = default; - void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) { + static void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) { + DCHECK(sendLogs.empty()); + std::unordered_set memLock; + std::list> ranges; for (auto& log : cacheLogs) { - auto code = mergeAble(log); + auto code = mergeAble(log, memLock, ranges); if (code == MergeAbleCode::MERGE_BOTH) { sendLogs.emplace_back(); std::swap(cacheLogs.front(), sendLogs.back()); @@ -158,7 +162,11 @@ class AppendLogsIteratorFactory { std::swap(cacheLogs.front(), sendLogs.back()); cacheLogs.pop_front(); break; - } else { // NONE, or MERGE_NEXT + } else if (code == MergeAbleCode::NO_MERGE) { + // if we meet some failed atomicOp, we can just skip it. + cacheLogs.pop_front(); + continue; + } else { // MERGE_NEXT break; } } @@ -169,7 +177,9 @@ class AppendLogsIteratorFactory { * * @param logWrapper */ - MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper) { + static MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper, + std::unordered_set& memLock, + std::list>& ranges) { // log type: switch (std::get<1>(logWrapper)) { case LogType::NORMAL: { @@ -178,9 +188,9 @@ class AppendLogsIteratorFactory { if (log.empty()) { return MergeAbleCode::MERGE_BOTH; } - decode(log, updateSet, ranges_); + decode(log, updateSet, ranges); for (auto& key : updateSet) { - memLock_.insert(key.str()); + memLock.insert(key.str()); } return MergeAbleCode::MERGE_BOTH; } @@ -197,7 +207,7 @@ class AppendLogsIteratorFactory { if (!promiseRef.isFulfilled()) { promiseRef.setValue(code); } - return MergeAbleCode::MERGE_BOTH; + return MergeAbleCode::NO_MERGE; } std::get<2>(logWrapper) = std::move(result); /** @@ -205,15 +215,15 @@ class AppendLogsIteratorFactory { * but reject if same in different logs. */ for (auto& key : read) { - auto cit = memLock_.find(key); + auto cit = memLock.find(key); // read after write is not acceptable. - if (cit != memLock_.end()) { + if (cit != memLock.end()) { return MergeAbleCode::MERGE_NEXT; } // if we try to read a key, in any range - // for (auto& [begin, end] : ranges_) { - for (auto& it : ranges_) { + // for (auto& [begin, end] : ranges) { + for (auto& it : ranges) { auto* begin = it.first.c_str(); auto* end = it.second.c_str(); auto* pKey = key.c_str(); @@ -225,7 +235,7 @@ class AppendLogsIteratorFactory { for (auto& key : write) { // it doesn't matter if insert failed. (if write conflict, last write win) - memLock_.insert(key); + memLock.insert(key); } return MergeAbleCode::MERGE_BOTH; } @@ -235,9 +245,9 @@ class AppendLogsIteratorFactory { return MergeAbleCode::NO_MERGE; } - void decode(const std::string& log, - std::vector& updateSet, - std::list>& ranges) { + static void decode(const std::string& log, + std::vector& updateSet, + std::list>& ranges) { switch (log[sizeof(int64_t)]) { case nebula::kvstore::OP_PUT: { auto pieces = nebula::kvstore::decodeMultiValues(log); @@ -299,8 +309,8 @@ class AppendLogsIteratorFactory { } private: - std::unordered_set memLock_; - std::list> ranges_; + // std::unordered_set memLock; + // std::list> ranges; }; /******************************************************** @@ -798,8 +808,10 @@ folly::Future RaftPart::appendLogAsync(ClusterID source // the partition stops { std::lock_guard lck(logsLock_); - AppendLogsIteratorFactory fact; - fact.make(logs_, sendingLogs_); + AppendLogsIteratorFactory::make(logs_, sendingLogs_); + if (sendingLogs_.empty()) { + return retFuture; + } } AppendLogsIterator it(firstId, termId, std::move(sendingLogs_)); appendLogsInternal(std::move(it), termId); @@ -858,7 +870,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb, LogID committedId, TermID prevLogTerm, LogID prevLogId) { - LOG(INFO) << "RaftPart::replicateLogs()"; using namespace folly; // NOLINT since the fancy overload of | operator decltype(hosts_) hosts; @@ -1025,12 +1036,27 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, } // at this monment, we have confidence logs should be succeeded replicated - // sendingPromise_.setOneSharedValue(nebula::cpp2::ErrorCode::SUCCEEDED); { std::lock_guard lck(logsLock_); CHECK(replicatingLogs_); iter.commit(); - replicatingLogs_ = false; + if (logs_.empty()) { + // no incoming during log replication + replicatingLogs_ = false; + VLOG(4) << idStr_ << "No more log to be replicated"; + return; + } else { + // we have some new coming logs during replication + // need to send them also + AppendLogsIteratorFactory::make(logs_, sendingLogs_); + if (sendingLogs_.empty()) { + return; + } + auto firstId = lastLogId_ + 1; + AppendLogsIterator it(firstId, currTerm, std::move(sendingLogs_)); + this->appendLogsInternal(std::move(it), currTerm); + } + return; } } else { @@ -1998,11 +2024,19 @@ bool RaftPart::checkAppendLogResult(nebula::cpp2::ErrorCode res) { if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { { std::lock_guard lck(logsLock_); + auto setPromiseForLogs = [&](LogCache& logs) { + for (auto& log : logs) { + auto& promiseRef = std::get<4>(log); + if (!promiseRef.isFulfilled()) { + promiseRef.setValue(res); + } + } + }; + setPromiseForLogs(logs_); + setPromiseForLogs(sendingLogs_); logs_.clear(); - // cachingPromise_.setValue(res); - // cachingPromise_.reset(); + sendingLogs_.clear(); bufferOverFlow_ = false; - // sendingPromise_.setValue(res); replicatingLogs_ = false; } return false;