diff --git a/src/kvstore/CMakeLists.txt b/src/kvstore/CMakeLists.txt index 549be980087..748eb8c42f4 100644 --- a/src/kvstore/CMakeLists.txt +++ b/src/kvstore/CMakeLists.txt @@ -6,7 +6,6 @@ nebula_add_library( PartManager.cpp NebulaStore.cpp RocksEngineConfig.cpp - LogEncoder.cpp NebulaSnapshotManager.cpp RateLimiter.cpp plugins/elasticsearch/ESListener.cpp diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index c97e6c878a7..0ebeefb5e2c 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -270,6 +270,16 @@ inline rocksdb::Slice toSlice(const folly::StringPiece& str) { using KVMap = std::unordered_map; using KVArrayIterator = std::vector::const_iterator; +class MergeableAtomicOpResult { + public: + nebula::cpp2::ErrorCode code; + std::string batch; // batched result, like before. + std::list readSet; + std::list writeSet; +}; + +using MergeableAtomicOp = folly::Function; + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index edf9c1fbe4c..c1d42074009 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -312,7 +312,7 @@ class KVStore { */ virtual void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - raftex::AtomicOp op, + MergeableAtomicOp op, KVCallback cb) = 0; /** diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 04e6a911428..1e7e7180478 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -918,7 +918,7 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId, void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - raftex::AtomicOp op, + MergeableAtomicOp op, KVCallback cb) { auto ret = part(spaceId, partId); if (!ok(ret)) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 89c12d424cb..7a5bff80f1e 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -415,7 +415,7 @@ class NebulaStore : public KVStore, public Handler { */ void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - raftex::AtomicOp op, + MergeableAtomicOp op, KVCallback cb) override; /** diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index dbf765aa8cd..04d9145297f 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -115,7 +115,7 @@ void Part::sync(KVCallback cb) { [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } -void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { +void Part::asyncAtomicOp(MergeableAtomicOp op, KVCallback cb) { atomicOpAsync(std::move(op)) .thenValue( [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index d19138ce189..7db8586dfea 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -124,7 +124,7 @@ class Part : public raftex::RaftPart { * @param op Atomic operation * @param cb Callback when has a result */ - void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb); + void asyncAtomicOp(MergeableAtomicOp op, KVCallback cb); /** * @brief Add a raft learner asynchronously by adding raft log diff --git a/src/kvstore/raftex/CMakeLists.txt b/src/kvstore/raftex/CMakeLists.txt index 6e2910c869c..07e1a3afc54 100644 --- a/src/kvstore/raftex/CMakeLists.txt +++ b/src/kvstore/raftex/CMakeLists.txt @@ -5,6 +5,7 @@ nebula_add_library( RaftexService.cpp Host.cpp SnapshotManager.cpp + ../LogEncoder.cpp ) nebula_add_subdirectory(test) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index e2605971c0e..ecc0bc60265 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -50,92 +50,81 @@ using nebula::wal::FileBasedWalPolicy; using OpProcessor = folly::Function(AtomicOp op)>; +/** + * @brief code to describle if a log can be merged with others + * NO_MERGE: can't merge with any other + * MERGE_NEXT: can't previous logs, can merge with next. (has to be head) + * MERGE_PREV: can merge with previous, can't merge any more. (has to be tail) + * MERGE_BOTH: can merge with any other + * + * Normal / heartbeat will always be MERGE_BOTH + * Command will alwayse be MERGE_PREV + * ATOMIC_OP can be either MERGE_NEXT or MERGE_BOTH + * depends on if it read a key in write set. + * no log type will judge as NO_MERGE + */ + +enum class MergeAbleCode { + NO_MERGE = 0, + MERGE_NEXT = 1, + MERGE_PREV = 2, + MERGE_BOTH = 3, +}; + +/** + * @brief this is an Iterator deal with memory lock. + */ class AppendLogsIterator final : public LogIterator { public: - AppendLogsIterator(LogID firstLogId, TermID termId, RaftPart::LogCache logs, OpProcessor opCB) - : firstLogId_(firstLogId), - termId_(termId), - logId_(firstLogId), - logs_(std::move(logs)), - opCB_(std::move(opCB)) { - leadByAtomicOp_ = processAtomicOp(); - valid_ = idx_ < logs_.size(); - hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_; - if (valid_) { - currLogType_ = lastLogType_ = logType(); - } - } - + AppendLogsIterator(LogID firstLogId, TermID termId, RaftPart::LogCache logs) + : firstLogId_(firstLogId), termId_(termId), logId_(firstLogId), logs_(std::move(logs)) {} AppendLogsIterator(const AppendLogsIterator&) = delete; AppendLogsIterator(AppendLogsIterator&&) = default; AppendLogsIterator& operator=(const AppendLogsIterator&) = delete; AppendLogsIterator& operator=(AppendLogsIterator&&) = default; - bool leadByAtomicOp() const { - return leadByAtomicOp_; - } - - bool hasNonAtomicOpLogs() const { - return hasNonAtomicOpLogs_; - } - - LogID firstLogId() const { - return firstLogId_; - } - - LogID lastLogId() const { - return firstLogId_ + logs_.size() - 1; - } - - // Return true if the current log is a AtomicOp, otherwise return false - bool processAtomicOp() { - while (idx_ < logs_.size()) { - auto& tup = logs_.at(idx_); - auto logType = std::get<1>(tup); - if (logType != LogType::ATOMIC_OP) { - // Not a AtomicOp - return false; + ~AppendLogsIterator() { + if (!logs_.empty()) { + size_t notFulfilledPromise = 0; + for (auto& log : logs_) { + auto& promiseRef = std::get<4>(log); + if (!promiseRef.isFulfilled()) { + ++notFulfilledPromise; + } } - - // Process AtomicOp log - CHECK(!!opCB_); - opResult_ = opCB_(std::move(std::get<3>(tup))); - if (opResult_.has_value()) { - // AtomicOp Succeeded - return true; - } else { - // AtomicOp failed, move to the next log, but do not increment the - // logId_ - ++idx_; + if (notFulfilledPromise > 0) { + LOG(FATAL) << "notFulfilledPromise == " << notFulfilledPromise; } } + } - // Reached the end - return false; + void commit(nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED) { + for (auto it = logs_.begin(); it != logs_.end(); ++it) { + auto& promiseRef = std::get<4>(*it); + if (!promiseRef.isFulfilled()) { + DCHECK(!promiseRef.isFulfilled()); + promiseRef.setValue(code); + } + } } LogIterator& operator++() override { ++idx_; ++logId_; - if (idx_ < logs_.size()) { - currLogType_ = logType(); - valid_ = currLogType_ != LogType::ATOMIC_OP; - if (valid_) { - hasNonAtomicOpLogs_ = true; - } - valid_ = valid_ && lastLogType_ != LogType::COMMAND; - lastLogType_ = currLogType_; - } else { - valid_ = false; - } return *this; } - // The iterator becomes invalid when exhausting the logs - // **OR** running into a AtomicOp log bool valid() const override { - return valid_; + return idx_ < logs_.size(); + } + + bool empty() const { + return logs_.empty(); + } + + LogID firstLogId() const { + return firstLogId_; } LogID logId() const override { @@ -153,31 +142,7 @@ class AppendLogsIterator final : public LogIterator { } folly::StringPiece logMsg() const override { - DCHECK(valid()); - if (currLogType_ == LogType::ATOMIC_OP) { - CHECK(opResult_.has_value()); - return opResult_.value(); - } else { - return std::get<2>(logs_.at(idx_)); - } - } - - // Return true when there is no more log left for processing - bool empty() const { - return idx_ >= logs_.size(); - } - - // Resume the iterator so that we can continue to process the remaining logs - void resume() { - CHECK(!valid_); - if (!empty()) { - leadByAtomicOp_ = processAtomicOp(); - valid_ = idx_ < logs_.size(); - hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_; - if (valid_) { - currLogType_ = lastLogType_ = logType(); - } - } + return std::get<2>(logs_.at(idx_)); } LogType logType() const { @@ -186,17 +151,176 @@ class AppendLogsIterator final : public LogIterator { private: size_t idx_{0}; - bool leadByAtomicOp_{false}; - bool hasNonAtomicOpLogs_{false}; - bool valid_{true}; - LogType lastLogType_{LogType::NORMAL}; - LogType currLogType_{LogType::NORMAL}; - std::optional opResult_; LogID firstLogId_; TermID termId_; LogID logId_; RaftPart::LogCache logs_; - OpProcessor opCB_; +}; + +class AppendLogsIteratorFactory { + public: + AppendLogsIteratorFactory() = default; + static void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) { + DCHECK(sendLogs.empty()); + std::unordered_set memLock; + std::list> ranges; + for (auto& log : cacheLogs) { + auto code = mergeAble(log, memLock, ranges); + if (code == MergeAbleCode::MERGE_BOTH) { + sendLogs.emplace_back(); + std::swap(cacheLogs.front(), sendLogs.back()); + cacheLogs.pop_front(); + continue; + } else if (code == MergeAbleCode::MERGE_PREV) { + sendLogs.emplace_back(); + std::swap(cacheLogs.front(), sendLogs.back()); + cacheLogs.pop_front(); + break; + } else if (code == MergeAbleCode::NO_MERGE) { + // if we meet some failed atomicOp, we can just skip it. + cacheLogs.pop_front(); + continue; + } else { // MERGE_NEXT + break; + } + } + } + + /** + * @brief check if a incoming log can be merged with previous logs + * + * @param logWrapper + */ + static MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper, + std::unordered_set& memLock, + std::list>& ranges) { + // log type: + switch (std::get<1>(logWrapper)) { + case LogType::NORMAL: { + std::vector updateSet; + auto& log = std::get<2>(logWrapper); + if (log.empty()) { + return MergeAbleCode::MERGE_BOTH; + } + decode(log, updateSet, ranges); + for (auto& key : updateSet) { + memLock.insert(key.str()); + } + return MergeAbleCode::MERGE_BOTH; + } + case LogType::COMMAND: { + return MergeAbleCode::MERGE_PREV; + } + case LogType::ATOMIC_OP: { + auto& atomOp = std::get<3>(logWrapper); + auto [code, result, read, write] = atomOp(); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + DLOG(INFO) << "===> OOPs, atomOp failed!!!, code = " + << apache::thrift::util::enumNameSafe(code); + auto& promiseRef = std::get<4>(logWrapper); + if (!promiseRef.isFulfilled()) { + DCHECK(!promiseRef.isFulfilled()); + promiseRef.setValue(code); + } + return MergeAbleCode::NO_MERGE; + } + std::get<2>(logWrapper) = std::move(result); + /** + * @brief We accept same read/write key in a one log, + * but reject if same in different logs. + */ + for (auto& key : read) { + auto cit = memLock.find(key); + // read after write is not acceptable. + if (cit != memLock.end()) { + return MergeAbleCode::MERGE_NEXT; + } + + // if we try to read a key, in any range + for (auto& it : ranges) { + auto* begin = it.first.c_str(); + auto* end = it.second.c_str(); + auto* pKey = key.c_str(); + if ((std::strcmp(begin, pKey) <= 0) && (std::strcmp(pKey, end) <= 0)) { + return MergeAbleCode::MERGE_NEXT; + } + } + } + + for (auto& key : write) { + // it doesn't matter if insert failed. (if write conflict, last write win) + memLock.insert(key); + } + return MergeAbleCode::MERGE_BOTH; + } + default: + LOG(ERROR) << "should not get here"; + } + return MergeAbleCode::NO_MERGE; + } + + static void decode(const std::string& log, + std::vector& updateSet, + std::list>& ranges) { + switch (log[sizeof(int64_t)]) { + case nebula::kvstore::OP_PUT: { + auto pieces = nebula::kvstore::decodeMultiValues(log); + updateSet.push_back(pieces[0]); + break; + } + case nebula::kvstore::OP_MULTI_PUT: { + auto kvs = nebula::kvstore::decodeMultiValues(log); + // Make the number of values are an even number + DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); + for (size_t i = 0; i < kvs.size(); i += 2) { + updateSet.push_back(kvs[i]); + } + break; + } + case nebula::kvstore::OP_REMOVE: { + auto key = nebula::kvstore::decodeSingleValue(log); + updateSet.push_back(key); + break; + } + case nebula::kvstore::OP_MULTI_REMOVE: { + auto keys = nebula::kvstore::decodeMultiValues(log); + for (auto k : keys) { + updateSet.push_back(k); + } + break; + } + case nebula::kvstore::OP_REMOVE_RANGE: { + auto range = nebula::kvstore::decodeMultiValues(log); + auto item = std::make_pair(range[0].str(), range[1].str()); + ranges.emplace_back(std::move(item)); + break; + } + case nebula::kvstore::OP_BATCH_WRITE: { + auto data = nebula::kvstore::decodeBatchValue(log); + for (auto& op : data) { + if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_PUT) { + updateSet.push_back(op.second.first); + } else if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_REMOVE) { + updateSet.push_back(op.second.first); + } else if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_REMOVE_RANGE) { + auto begin = op.second.first; + auto end = op.second.second; + ranges.emplace_back(std::make_pair(begin, end)); + } + } + break; + } + case nebula::kvstore::OP_ADD_PEER: + case nebula::kvstore::OP_ADD_LEARNER: + case nebula::kvstore::OP_TRANS_LEADER: + case nebula::kvstore::OP_REMOVE_PEER: { + break; + } + default: { + VLOG(3) << "Unknown operation: " << static_cast(log[0]); + } + } + } }; /******************************************************** @@ -247,7 +371,6 @@ RaftPart::RaftPart( return this->preProcessLog(logId, logTermId, logClusterId, log); }, diskMan); - logs_.reserve(FLAGS_max_batch_size); CHECK(!!executor_) << idStr_ << "Should not be nullptr"; } @@ -623,7 +746,7 @@ folly::Future RaftPart::appendAsync(ClusterID source, s return appendLogAsync(source, LogType::NORMAL, std::move(log)); } -folly::Future RaftPart::atomicOpAsync(AtomicOp op) { +folly::Future RaftPart::atomicOpAsync(kvstore::MergeableAtomicOp op) { return appendLogAsync(clusterId_, LogType::ATOMIC_OP, "", std::move(op)); } @@ -634,7 +757,7 @@ folly::Future RaftPart::sendCommandAsync(std::string lo folly::Future RaftPart::appendLogAsync(ClusterID source, LogType logType, std::string log, - AtomicOp op) { + kvstore::MergeableAtomicOp op) { if (blocking_) { // No need to block heartbeats and empty log. if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) { @@ -648,7 +771,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (bufferOverFlow_) { VLOG_EVERY_N(2, 1000) << idStr_ << "The appendLog buffer is full. Please slow down the log appending rate." - << "replicatingLogs_ :" << replicatingLogs_; + << "replicatingLogs_ :" << std::boolalpha << replicatingLogs_; return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } { @@ -659,7 +782,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (logs_.size() >= FLAGS_max_batch_size) { // Buffer is full VLOG(2) << idStr_ << "The appendLog buffer is full. Please slow down the log appending rate." - << "replicatingLogs_ :" << replicatingLogs_; + << "replicatingLogs_ :" << std::boolalpha << replicatingLogs_; bufferOverFlow_ = true; return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } @@ -668,26 +791,14 @@ folly::Future RaftPart::appendLogAsync(ClusterID source // Append new logs to the buffer DCHECK_GE(source, 0); - logs_.emplace_back(source, logType, std::move(log), std::move(op)); - switch (logType) { - case LogType::ATOMIC_OP: - retFuture = cachingPromise_.getSingleFuture(); - break; - case LogType::COMMAND: - retFuture = cachingPromise_.getAndRollSharedFuture(); - break; - case LogType::NORMAL: - retFuture = cachingPromise_.getSharedFuture(); - break; - } + folly::Promise promise; + retFuture = promise.getFuture(); + logs_.emplace_back(source, logType, std::move(log), std::move(op), std::move(promise)); bool expected = false; if (replicatingLogs_.compare_exchange_strong(expected, true)) { // We need to send logs to all followers VLOG(4) << idStr_ << "Preparing to send AppendLog request"; - sendingPromise_ = std::move(cachingPromise_); - cachingPromise_.reset(); - std::swap(swappedOutLogs, logs_); bufferOverFlow_ = false; } else { VLOG(4) << idStr_ << "Another AppendLogs request is ongoing, just return"; @@ -710,26 +821,22 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (!checkAppendLogResult(res)) { // Mosy likely failed because the partition is not leader VLOG_EVERY_N(2, 1000) << idStr_ << "Cannot append logs, clean the buffer"; - return res; + return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } // Replicate buffered logs to all followers // Replication will happen on a separate thread and will block // until majority accept the logs, the leadership changes, or // the partition stops - VLOG(4) << idStr_ << "Calling appendLogsInternal()"; - AppendLogsIterator it( - firstId, - termId, - std::move(swappedOutLogs), - [this](AtomicOp opCB) -> std::optional { - CHECK(opCB != nullptr); - auto opRet = opCB(); - if (!opRet.has_value()) { - // Failed - sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); + { + std::lock_guard lck(logsLock_); + AppendLogsIteratorFactory::make(logs_, sendingLogs_); + bufferOverFlow_ = false; + if (sendingLogs_.empty()) { + replicatingLogs_ = false; + return retFuture; + } + } + AppendLogsIterator it(firstId, termId, std::move(sendingLogs_)); appendLogsInternal(std::move(it), termId); return retFuture; @@ -741,14 +848,6 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { TermID prevLogTerm = 0; LogID committed = 0; LogID lastId = 0; - if (iter.valid()) { - VLOG(4) << idStr_ << "Ready to append logs from id " << iter.logId() << " (Current term is " - << currTerm << ")"; - } else { - VLOG(4) << idStr_ << "Only happened when Atomic op failed"; - replicatingLogs_ = false; - return; - } nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED; do { std::lock_guard g(raftLock_); @@ -778,6 +877,7 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { } while (false); if (!checkAppendLogResult(res)) { + iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } // Step 2: Replicate to followers @@ -810,6 +910,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, if (!checkAppendLogResult(res)) { VLOG(3) << idStr_ << "replicateLogs failed because of not leader or term changed"; + iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -907,6 +1008,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, } } if (!checkAppendLogResult(res)) { + iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -914,7 +1016,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, // Majority have succeeded VLOG(4) << idStr_ << numSucceeded << " hosts have accepted the logs"; - LogID firstLogId = 0; do { std::lock_guard g(raftLock_); res = canAppendLogs(currTerm); @@ -931,6 +1032,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, VLOG(3) << idStr_ << "processAppendLogResponses failed because of not leader " "or term changed"; + iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -951,7 +1053,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, CHECK_EQ(lastLogId, lastCommitId); committedLogId_ = lastCommitId; committedLogTerm_ = lastCommitTerm; - firstLogId = lastLogId_ + 1; lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec(); if (!commitInThisTerm_) { @@ -966,56 +1067,32 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, << lastLogId; } - // Step 4: Fulfill the promise - if (iter.hasNonAtomicOpLogs()) { - sendingPromise_.setOneSharedValue(nebula::cpp2::ErrorCode::SUCCEEDED); - } - if (iter.leadByAtomicOp()) { - sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::SUCCEEDED); - } - // Step 5: Check whether need to continue - // the log replication + // at this monment, we have confidence logs should be succeeded replicated + LogID firstId = 0; { std::lock_guard lck(logsLock_); CHECK(replicatingLogs_); - // Continue to process the original AppendLogsIterator if necessary - iter.resume(); - // If no more valid logs to be replicated in iter, create a new one if we - // have new log - if (iter.empty()) { - VLOG(4) << idStr_ << "logs size " << logs_.size(); - if (logs_.size() > 0) { - // continue to replicate the logs - sendingPromise_ = std::move(cachingPromise_); - cachingPromise_.reset(); - iter = AppendLogsIterator(firstLogId, - currTerm, - std::move(logs_), - [this](AtomicOp op) -> std::optional { - auto opRet = op(); - if (!opRet.has_value()) { - // Failed - sendingPromise_.setOneSingleValue( - nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); - logs_.clear(); - bufferOverFlow_ = false; - } - // Reset replicatingLogs_ one of the following is true: - // 1. old iter is empty && logs_.size() == 0 - // 2. old iter is empty && logs_.size() > 0, but all logs in new iter is - // atomic op, - // and all of them failed, which would make iter is empty again - if (iter.empty()) { + iter.commit(); + if (logs_.empty()) { + // no incoming during log replication + replicatingLogs_ = false; + VLOG(4) << idStr_ << "No more log to be replicated"; + return; + } else { + // we have some new coming logs during replication + // need to send them also + AppendLogsIteratorFactory::make(logs_, sendingLogs_); + bufferOverFlow_ = false; + if (sendingLogs_.empty()) { replicatingLogs_ = false; - VLOG(4) << idStr_ << "No more log to be replicated"; return; } + firstId = lastLogId_ + 1; } } - this->appendLogsInternal(std::move(iter), currTerm); + AppendLogsIterator it(firstId, currTerm, std::move(sendingLogs_)); + this->appendLogsInternal(std::move(it), currTerm); + return; } else { // Not enough hosts accepted the log, re-try VLOG_EVERY_N(2, 1000) << idStr_ << "Only " << numSucceeded @@ -2029,11 +2106,19 @@ bool RaftPart::checkAppendLogResult(nebula::cpp2::ErrorCode res) { if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { { std::lock_guard lck(logsLock_); + auto setPromiseForLogs = [&](LogCache& logs) { + for (auto& log : logs) { + auto& promiseRef = std::get<4>(log); + if (!promiseRef.isFulfilled()) { + promiseRef.setValue(res); + } + } + }; + setPromiseForLogs(logs_); + setPromiseForLogs(sendingLogs_); logs_.clear(); - cachingPromise_.setValue(res); - cachingPromise_.reset(); + sendingLogs_.clear(); bufferOverFlow_ = false; - sendingPromise_.setValue(res); replicatingLogs_ = false; } return false; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 96be8fb58a6..83b097e454b 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -65,10 +65,10 @@ class AppendLogsIterator; * should be applied atomically. You could implement CAS, READ-MODIFY-WRITE * operations though it. * */ -using AtomicOp = folly::Function(void)>; - +using AtomicOp = folly::Function(void)>; class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; + friend class AppendLogsIteratorFactory; friend class Host; friend class SnapshotManager; FRIEND_TEST(MemberChangeTest, AddRemovePeerTest); @@ -273,7 +273,7 @@ class RaftPart : public std::enable_shared_from_this { * @param op Atomic operation, will output a log if succeed * @return folly::Future */ - folly::Future atomicOpAsync(AtomicOp op); + folly::Future atomicOpAsync(kvstore::MergeableAtomicOp op); /** * @brief Send a log of COMMAND type @@ -565,8 +565,13 @@ class RaftPart : public std::enable_shared_from_this { using AppendLogResponses = std::vector>; using HeartbeatResponses = std::vector>; - // - using LogCache = std::vector>; + // + using LogCacheItem = std::tuple>; + using LogCache = std::deque; /**************************************************** * @@ -714,7 +719,7 @@ class RaftPart : public std::enable_shared_from_this { folly::Future appendLogAsync(ClusterID source, LogType logType, std::string log, - AtomicOp cb = nullptr); + kvstore::MergeableAtomicOp cb = nullptr); /** * @brief Append the logs in iterator @@ -787,116 +792,6 @@ class RaftPart : public std::enable_shared_from_this { void updateQuorum(); protected: - template - class PromiseSet final { - public: - PromiseSet() = default; - PromiseSet(const PromiseSet&) = delete; - PromiseSet(PromiseSet&&) = default; - - ~PromiseSet() = default; - - PromiseSet& operator=(const PromiseSet&) = delete; - PromiseSet& operator=(PromiseSet&& right) = default; - - /** - * @brief Clean all promises - */ - void reset() { - sharedPromises_.clear(); - singlePromises_.clear(); - rollSharedPromise_ = true; - } - - /** - * @brief Used for NORMAL raft log - * - * @return folly::Future - */ - folly::Future getSharedFuture() { - if (rollSharedPromise_) { - sharedPromises_.emplace_back(); - rollSharedPromise_ = false; - } - - return sharedPromises_.back().getFuture(); - } - - /** - * @brief Used for ATOMIC_OP raft log - * - * @return folly::Future - */ - folly::Future getSingleFuture() { - singlePromises_.emplace_back(); - rollSharedPromise_ = true; - - return singlePromises_.back().getFuture(); - } - - /** - * @brief Used for COMMAND raft log - * - * @return folly::Future - */ - folly::Future getAndRollSharedFuture() { - if (rollSharedPromise_) { - sharedPromises_.emplace_back(); - } - rollSharedPromise_ = true; - return sharedPromises_.back().getFuture(); - } - - /** - * @brief Set shared promise - * - * @tparam VT - * @param val - */ - template - void setOneSharedValue(VT&& val) { - CHECK(!sharedPromises_.empty()); - sharedPromises_.front().setValue(std::forward(val)); - sharedPromises_.pop_front(); - } - - /** - * @brief Set single promise - * - * @tparam VT - * @param val - */ - template - void setOneSingleValue(VT&& val) { - CHECK(!singlePromises_.empty()); - singlePromises_.front().setValue(std::forward(val)); - singlePromises_.pop_front(); - } - - /** - * @brief Set all promises to result, usually a failed result - * - * @param val - */ - void setValue(ValueType val) { - for (auto& p : sharedPromises_) { - p.setValue(val); - } - for (auto& p : singlePromises_) { - p.setValue(val); - } - } - - private: - // Whether the last future was returned from a shared promise - bool rollSharedPromise_{true}; - - // Promises shared by continuous non atomic op logs - std::list> sharedPromises_; - // A list of promises for atomic op logs - std::list> singlePromises_; - }; - const std::string idStr_; const ClusterID clusterId_; @@ -914,14 +809,12 @@ class RaftPart : public std::enable_shared_from_this { mutable std::mutex logsLock_; std::atomic_bool replicatingLogs_{false}; std::atomic_bool bufferOverFlow_{false}; - PromiseSet cachingPromise_; LogCache logs_; + LogCache sendingLogs_; // Partition level lock to synchronize the access of the partition mutable std::mutex raftLock_; - PromiseSet sendingPromise_; - Status status_; Role role_; diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index 78c706912e8..23e65db2391 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -29,7 +29,12 @@ TEST_F(LogCASTest, StartWithValidCAS) { // Append logs LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); + leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = "CAS Log Message"; + return ret; + }); msgs.emplace_back("CAS Log Message"); appendLogs(1, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -41,7 +46,11 @@ TEST_F(LogCASTest, StartWithInvalidCAS) { // Append logs LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + return ret; + }); appendLogs(0, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -54,7 +63,12 @@ TEST_F(LogCASTest, ValidCASInMiddle) { std::vector msgs; appendLogs(0, 4, leader_, msgs); - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); + leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = "CAS Log Message"; + return ret; + }); msgs.emplace_back("CAS Log Message"); appendLogs(6, 9, leader_, msgs); @@ -69,7 +83,11 @@ TEST_F(LogCASTest, InvalidCASInMiddle) { std::vector msgs; appendLogs(0, 4, leader_, msgs); - leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + return ret; + }); appendLogs(5, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -83,10 +101,21 @@ TEST_F(LogCASTest, EndWithValidCAS) { std::vector msgs; appendLogs(0, 7, leader_, msgs); - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); + // leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); + leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = "CAS Log Message"; + return ret; + }); msgs.emplace_back("CAS Log Message"); - auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); + auto fut = leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = "CAS Log Message"; + return ret; + }); msgs.emplace_back("CAS Log Message"); fut.wait(); LOG(INFO) << "<===== Finish appending logs"; @@ -100,8 +129,12 @@ TEST_F(LogCASTest, EndWithInvalidCAS) { std::vector msgs; appendLogs(0, 7, leader_, msgs); - leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); - auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + auto fut = leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + return ret; + }); + fut.wait(); LOG(INFO) << "<===== Finish appending logs"; @@ -113,7 +146,12 @@ TEST_F(LogCASTest, AllValidCAS) { LOG(INFO) << "=====> Start appending logs"; std::vector msgs; for (int i = 1; i <= 10; ++i) { - auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("TTest CAS Log"); }); + auto fut = leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = "Test CAS Log"; + return ret; + }); msgs.emplace_back("Test CAS Log"); if (i == 10) { fut.wait(); @@ -129,7 +167,14 @@ TEST_F(LogCASTest, AllInvalidCAS) { LOG(INFO) << "=====> Start appending logs"; std::vector msgs; for (int i = 1; i <= 10; ++i) { - auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log"); }); + // auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log"); }); + + auto fut = leader_->atomicOpAsync([] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + return ret; + }); + if (i == 10) { fut.wait(); } @@ -157,8 +202,15 @@ TEST_F(LogCASTest, OnlyOneCasSucceed) { } else { log = "FCAS Log " + std::to_string(i); } - auto fut = leader_->atomicOpAsync( - [log = std::move(log)]() mutable { return test::compareAndSet(log); }); + auto fut = leader_->atomicOpAsync([=] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + if (i == 1) { + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = msgs.back(); + } + return ret; + }); if (i == 10) { fut.wait(); } @@ -187,8 +239,15 @@ TEST_F(LogCASTest, ZipCasTest) { } else { log = "FCAS Log " + std::to_string(i); } - auto fut = leader_->atomicOpAsync( - [log = std::move(log)]() mutable { return test::compareAndSet(log); }); + auto fut = leader_->atomicOpAsync([=] { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; + if (i % 2) { + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = msgs.back(); + } + return ret; + }); if (i == 10) { fut.wait(); } @@ -209,7 +268,13 @@ TEST_F(LogCASTest, EmptyTest) { { LOG(INFO) << "return empty string for atomic operation!"; folly::Baton<> baton; - leader_->atomicOpAsync([log = std::move(log)]() mutable { return std::string(""); }) + leader_ + ->atomicOpAsync([]() mutable { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + // ret.batch = log; + return ret; + }) .thenValue([&baton](nebula::cpp2::ErrorCode res) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, res); baton.post(); @@ -219,7 +284,14 @@ TEST_F(LogCASTest, EmptyTest) { { LOG(INFO) << "return none string for atomic operation!"; folly::Baton<> baton; - leader_->atomicOpAsync([log = std::move(log)]() mutable { return std::nullopt; }) + // leader_->atomicOpAsync([log = std::move(log)]() mutable { return folly::none; }) + leader_ + ->atomicOpAsync([]() mutable { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; + // ret.batch = log; + return ret; + }) .thenValue([&baton](nebula::cpp2::ErrorCode res) { ASSERT_EQ(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED, res); baton.post(); diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index c1d3a7c6982..7023cc523bf 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -48,7 +48,8 @@ TEST_F(LogCommandTest, CommandInMiddle) { leader_->sendCommandAsync("Command Log Message"); msgs.emplace_back("Command Log Message"); - appendLogs(6, 9, leader_, msgs, true); + bool waitLastLog = true; + appendLogs(6, 9, leader_, msgs, waitLastLog); LOG(INFO) << "<===== Finish appending logs"; ASSERT_EQ(3, leader_->commitTimes_); @@ -97,16 +98,43 @@ TEST_F(LogCommandTest, MixedLogs) { leader_->sendCommandAsync("Command log 1"); msgs.emplace_back("Command log 1"); - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 2"); }); + kvstore::MergeableAtomicOp op1 = []() { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + auto optStr = test::compareAndSet("TCAS Log Message 2"); + if (optStr) { + ret.batch = *optStr; + } + return ret; + }; + leader_->atomicOpAsync(std::move(op1)); msgs.emplace_back("CAS Log Message 2"); leader_->appendAsync(0, "Normal log Message 3"); msgs.emplace_back("Normal log Message 3"); - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 4"); }); + kvstore::MergeableAtomicOp op2 = []() { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + auto optStr = test::compareAndSet("TCAS Log Message 4"); + if (optStr) { + ret.batch = *optStr; + } + return ret; + }; + leader_->atomicOpAsync(std::move(op2)); msgs.emplace_back("CAS Log Message 4"); - leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 5"); }); + kvstore::MergeableAtomicOp op3 = []() { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + auto optStr = test::compareAndSet("TCAS Log Message 5"); + if (optStr) { + ret.batch = *optStr; + } + return ret; + }; + leader_->atomicOpAsync(std::move(op3)); msgs.emplace_back("CAS Log Message 5"); leader_->sendCommandAsync("Command log 6"); @@ -118,7 +146,16 @@ TEST_F(LogCommandTest, MixedLogs) { leader_->appendAsync(0, "Normal log Message 8"); msgs.emplace_back("Normal log Message 8"); - leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + kvstore::MergeableAtomicOp op4 = []() { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + auto optStr = test::compareAndSet("FCAS Log Message"); + if (optStr) { + ret.batch = *optStr; + } + return ret; + }; + leader_->atomicOpAsync(std::move(op4)); leader_->sendCommandAsync("Command log 9"); msgs.emplace_back("Command log 9"); @@ -126,12 +163,23 @@ TEST_F(LogCommandTest, MixedLogs) { auto f = leader_->appendAsync(0, "Normal log Message 10"); msgs.emplace_back("Normal log Message 10"); - leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + kvstore::MergeableAtomicOp op5 = []() { + kvstore::MergeableAtomicOpResult ret; + ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; + auto optStr = test::compareAndSet("FCAS Log Message"); + if (optStr) { + ret.batch = *optStr; + } + return ret; + }; + leader_->atomicOpAsync(std::move(op5)); + // leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); f.wait(); LOG(INFO) << "<===== Finish appending logs"; - ASSERT_EQ(8, leader_->commitTimes_); + // previous is 8 + ASSERT_EQ(5, leader_->commitTimes_); checkConsensus(copies_, 0, 9, msgs); } diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index ca59ccaeefb..1c426d2bd4a 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -265,6 +265,7 @@ void appendLogs(int start, // Append 100 logs LOG(INFO) << "=====> Start appending logs from index " << start << " to " << end; for (int i = start; i <= end; ++i) { + LOG(INFO) << "=====> i = " << i; msgs.emplace_back(folly::stringPrintf("Test Log Message %03d", i)); auto fut = leader->appendAsync(0, msgs.back()); if (i == end && waitLastLog) { @@ -278,6 +279,7 @@ bool checkConsensus(std::vector>& copies, size_t start, size_t end, std::vector& msgs) { + LOG(INFO) << "checkConsensus()"; int32_t count = 0; for (; count < 3; count++) { bool consensus = true; @@ -287,10 +289,14 @@ bool checkConsensus(std::vector>& copies, // Check every copy for (auto& c : copies) { if (c != nullptr && c->isRunning()) { + LOG(INFO) << "====> checkConsensus(), msgs.size() " << msgs.size() << ", c->getNumLogs() " + << c->getNumLogs(); if (msgs.size() != c->getNumLogs() || !checkLog(c, start, end, msgs)) { consensus = false; break; } + } else { + LOG(INFO) << "====> checkConsensus(), c == nullptr || !c->isRunning()"; } } if (consensus == true) { diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index b7009423271..c6c6327e603 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -169,6 +169,7 @@ std::tuple TestShard::commitLogs( std::unique_ptr iter, bool wait, bool needLock) { UNUSED(wait); LogID lastId = kNoCommitLogId; + // LOG(INFO) << "TestShard::commitLogs() lastId = " << lastId; TermID lastTerm = kNoCommitLogTerm; int32_t commitLogsNum = 0; while (iter->valid()) { @@ -206,10 +207,13 @@ std::tuple TestShard::commitLogs( } VLOG(2) << "TestShard: " << idStr_ << "Committed log " << " up to " << lastId; + LOG(INFO) << "TestShard: " << idStr_ << "Committed log " + << " up to " << lastId; if (lastId > -1) { lastCommittedLogId_ = lastId; } if (commitLogsNum > 0) { + // LOG(INFO) << "====>> commitTimes_++"; commitTimes_++; } return {nebula::cpp2::ErrorCode::SUCCEEDED, lastId, lastTerm}; diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 08fbddd9243..9d195f6e422 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -632,7 +632,7 @@ TEST(NebulaStoreTest, TransLeaderTest) { part->asyncTransferLeader(targetAddr, [&](nebula::cpp2::ErrorCode) { baton.post(); }); baton.wait(); } - sleep(FLAGS_raft_heartbeat_interval_secs); + sleep(FLAGS_raft_heartbeat_interval_secs * 2); { nebula::meta::ActiveHostsMan::AllLeaders leaderIds; ASSERT_EQ(3, stores[0]->allLeader(leaderIds)); @@ -652,7 +652,7 @@ TEST(NebulaStoreTest, TransLeaderTest) { part->asyncTransferLeader(targetAddr, [&](nebula::cpp2::ErrorCode) { baton.post(); }); baton.wait(); } - sleep(FLAGS_raft_heartbeat_interval_secs); + sleep(FLAGS_raft_heartbeat_interval_secs * 2); for (int i = 0; i < replicas; i++) { nebula::meta::ActiveHostsMan::AllLeaders leaderIds; ASSERT_EQ(1UL, stores[i]->allLeader(leaderIds)); @@ -962,7 +962,9 @@ TEST(NebulaStoreTest, ReadSnapshotTest) { // put kv { std::vector> expected, result; - auto atomic = [&]() -> std::string { + + auto atomic = [&] { + kvstore::MergeableAtomicOpResult ret; std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -970,7 +972,9 @@ TEST(NebulaStoreTest, ReadSnapshotTest) { batchHolder->put(key.data(), val.data()); expected.emplace_back(std::move(key), std::move(val)); } - return encodeBatchValue(batchHolder->getBatch()); + ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = encodeBatchValue(batchHolder->getBatch()); + return ret; }; folly::Baton baton; @@ -1034,7 +1038,9 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { // put kv { std::vector> expected, result; - auto atomic = [&]() -> std::string { + + auto atomic = [&] { + kvstore::MergeableAtomicOpResult ret; std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -1042,7 +1048,9 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { batchHolder->put(key.data(), val.data()); expected.emplace_back(std::move(key), std::move(val)); } - return encodeBatchValue(batchHolder->getBatch()); + ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = encodeBatchValue(batchHolder->getBatch()); + return ret; }; folly::Baton baton; @@ -1066,7 +1074,9 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { // put and remove { std::vector> expected, result; - auto atomic = [&]() -> std::string { + + auto atomic = [&] { + kvstore::MergeableAtomicOpResult ret; std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -1079,9 +1089,10 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { for (auto i = 0; i < 20; i = i + 5) { batchHolder->remove(folly::stringPrintf("key_%d", i)); } - return encodeBatchValue(batchHolder->getBatch()); + ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = encodeBatchValue(batchHolder->getBatch()); + return ret; }; - folly::Baton baton; auto callback = [&](nebula::cpp2::ErrorCode code) { EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 8a68b25ac18..ca739fdc5af 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -152,36 +152,18 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) { void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { const auto& partEdges = req.get_parts(); const auto& propNames = req.get_prop_names(); - for (auto& part : partEdges) { - IndexCountWrapper wrapper(env_); - std::unique_ptr batchHolder = std::make_unique(); + for (const auto& part : partEdges) { auto partId = part.first; - const auto& newEdges = part.second; - std::vector dummyLock; - dummyLock.reserve(newEdges.size()); + const auto& edges = part.second; + // cache edgeKey + std::unordered_set visited; + visited.reserve(edges.size()); + std::vector kvs; + kvs.reserve(edges.size()); auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - - deleteDupEdge(const_cast&>(newEdges)); - for (auto& newEdge : newEdges) { - auto edgeKey = *newEdge.key_ref(); - auto l = std::make_tuple(spaceId_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->edgesML_->try_lock(l)) { - LOG(ERROR) << folly::sformat("edge locked : src {}, type {}, rank {}, dst {}", - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } + deleteDupEdge(const_cast&>(edges)); + for (const auto& edge : edges) { + auto edgeKey = *edge.key_ref(); VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref() << ", EdgeType: " << *edgeKey.edge_type_ref() << ", EdgeRanking: " << *edgeKey.ranking_ref() @@ -191,17 +173,12 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() - << ", dstVid: " << *edgeKey.dst_ref(); + << ", dstVid: " << *edgeKey.dst_ref() << ", ifNotExists_: " << std::boolalpha + << ifNotExists_; code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(*edgeKey.edge_type_ref())); if (!schema) { LOG(ERROR) << "Space " << spaceId_ << ", Edge " << *edgeKey.edge_type_ref() << " invalid"; @@ -209,239 +186,159 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } - auto props = newEdge.get_props(); - WriteResult wRet; - auto retEnc = encodeRowVal(schema.get(), propNames, props, wRet); - if (!retEnc.ok()) { - LOG(ERROR) << retEnc.status(); - code = writeResultTo(wRet, true); - break; - } - if (*edgeKey.edge_type_ref() > 0) { - std::string oldVal; - RowReaderWrapper nReader; - RowReaderWrapper oReader; - if (!ignoreExistedIndex_) { - auto obsIdx = findOldValue(partId, key); - if (nebula::ok(obsIdx)) { - // already exists in kvstore - if (ifNotExists_ && !nebula::value(obsIdx).empty()) { - continue; - } - if (!nebula::value(obsIdx).empty()) { - oldVal = std::move(value(obsIdx)); - oReader = RowReaderWrapper::getEdgePropReader( - env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), oldVal); - } - } else { - code = nebula::error(obsIdx); - break; - } - } - if (!retEnc.value().empty()) { - nReader = RowReaderWrapper::getEdgePropReader( - env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), retEnc.value()); - } - for (auto& index : indexes_) { - if (*edgeKey.edge_type_ref() == index->get_schema_id().get_edge_type()) { - /* - * step 1 , Delete old version index if exists. - */ - if (oReader != nullptr) { - auto ois = indexKeys(partId, oReader.get(), key, index, schema.get()); - if (!ois.empty()) { - // Check the index is building for the specified partition or not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); - for (auto& oi : ois) { - batchHolder->put(std::string(delOpKey), std::move(oi)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } else { - for (auto& oi : ois) { - batchHolder->remove(std::move(oi)); - } - } - } - } - /* - * step 2 , Insert new edge index - */ - if (nReader != nullptr) { - auto niks = indexKeys(partId, nReader.get(), key, index, schema.get()); - if (!niks.empty()) { - auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); - auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; - // Check the index is building for the specified partition or not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - for (auto& nik : niks) { - auto opKey = OperationKeyUtils::modifyOperationKey(partId, std::move(nik)); - batchHolder->put(std::move(opKey), std::string(niv)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } else { - for (auto& nik : niks) { - batchHolder->put(std::move(nik), std::string(niv)); - } - } - } - } - } - } + auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr()); + if (ifNotExists_ && !visited.emplace(key).second) { + LOG(INFO) << "skip " << edgeKey.src_ref()->getStr(); + continue; } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + + // collect values + WriteResult writeResult; + const auto& props = edge.get_props(); + auto encode = encodeRowVal(schema.get(), propNames, props, writeResult); + if (!encode.ok()) { + LOG(ERROR) << encode.status(); + code = writeResultTo(writeResult, true); break; } - batchHolder->put(std::move(key), std::move(retEnc.value())); - stats::StatsManager::addValue(kNumEdgesInserted); + kvs.emplace_back(std::move(key), std::move(encode.value())); } + + auto atomicOp = + [partId, data = std::move(kvs), this]() mutable -> kvstore::MergeableAtomicOpResult { + return addEdgesWithIndex(partId, std::move(data)); + }; + + auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->edgesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, code); - continue; - } - if (consistOp_) { - (*consistOp_)(*batchHolder, nullptr); + } else { + env_->kvstore_->asyncAtomicOp(spaceId_, partId, std::move(atomicOp), std::move(cb)); } - auto batch = encodeBatchValue(batchHolder->getBatch()); - DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); - env_->kvstore_->asyncAppendBatch(spaceId_, - partId, - std::move(batch), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode retCode) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, retCode); - }); } } -ErrorOr AddEdgesProcessor::addEdges( - PartitionID partId, const std::vector& edges) { +kvstore::MergeableAtomicOpResult AddEdgesProcessor::addEdgesWithIndex( + PartitionID partId, std::vector&& data) { + kvstore::MergeableAtomicOpResult ret; + ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; IndexCountWrapper wrapper(env_); std::unique_ptr batchHolder = std::make_unique(); - - /* - * Define the map newEdges to avoid inserting duplicate edge. - * This map means : - * map , - * -- edge_unique_key is only used as the unique key , for example: - * insert below edges in the same request: - * kv(part1_src1_edgeType1_rank1_dst1 , v1) - * kv(part1_src1_edgeType1_rank1_dst1 , v2) - * kv(part1_src1_edgeType1_rank1_dst1 , v3) - * kv(part1_src1_edgeType1_rank1_dst1 , v4) - * - * Ultimately, kv(part1_src1_edgeType1_rank1_dst1 , v4) . It's just what I need. - */ - std::unordered_map newEdges; - std::for_each( - edges.begin(), edges.end(), [&newEdges](const auto& e) { newEdges[e.first] = e.second; }); - - for (auto& e : newEdges) { - std::string val; - RowReaderWrapper oReader; - RowReaderWrapper nReader; - auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, e.first); + for (auto& [key, value] : data) { + auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key); + RowReaderWrapper oldReader; + RowReaderWrapper newReader = + RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, std::abs(edgeType), value); auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(edgeType)); if (!schema) { - LOG(ERROR) << "Space " << spaceId_ << ", Edge " << edgeType << " invalid"; - return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; + return ret; } - for (auto& index : indexes_) { - if (edgeType == index->get_schema_id().get_edge_type()) { - /* - * step 1 , Delete old version index if exists. - */ - if (!ignoreExistedIndex_ && val.empty()) { - auto obsIdx = findOldValue(partId, e.first); - if (!nebula::ok(obsIdx)) { - return nebula::error(obsIdx); - } - val = std::move(nebula::value(obsIdx)); - if (!val.empty()) { - oReader = - RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, val); - if (oReader == nullptr) { - LOG(ERROR) << "Bad format row"; - return nebula::cpp2::ErrorCode::E_INVALID_DATA; - } + + // only out-edge need to handle index + if (edgeType > 0) { + std::string oldVal; + if (!ignoreExistedIndex_) { + // read the old key value and initialize row reader if exists + auto result = findOldValue(partId, key); + if (nebula::ok(result)) { + if (ifNotExists_ && !nebula::value(result).empty()) { + continue; + } else if (!nebula::value(result).empty()) { + oldVal = std::move(nebula::value(result)); + oldReader = + RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, oldVal); + ret.readSet.emplace_back(key); } + } else { + // read old value failed + return ret; } - - if (!val.empty()) { - auto ois = indexKeys(partId, oReader.get(), e.first, index, schema.get()); - if (!ois.empty()) { - // Check the index is building for the specified partition or not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - auto deleteOpKey = OperationKeyUtils::deleteOperationKey(partId); - for (auto& oi : ois) { - batchHolder->put(std::string(deleteOpKey), std::move(oi)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - } else { - for (auto& oi : ois) { - batchHolder->remove(std::move(oi)); + } + for (const auto& index : indexes_) { + if (edgeType == index->get_schema_id().get_edge_type()) { + // step 1, Delete old version index if exists. + if (oldReader != nullptr) { + auto oldIndexKeys = indexKeys(partId, oldReader.get(), key, index, nullptr); + if (!oldIndexKeys.empty()) { + ret.writeSet.insert(ret.writeSet.end(), oldIndexKeys.begin(), oldIndexKeys.end()); + // Check the index is building for the specified partition or + // not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); + for (auto& idxKey : oldIndexKeys) { + ret.writeSet.push_back(idxKey); + batchHolder->put(std::string(delOpKey), std::move(idxKey)); + } + } else if (env_->checkIndexLocked(indexState)) { + return ret; + } else { + for (auto& idxKey : oldIndexKeys) { + ret.writeSet.push_back(idxKey); + batchHolder->remove(std::move(idxKey)); + } } } } - } - /* - * step 2 , Insert new edge index - */ - if (nReader == nullptr) { - nReader = - RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, e.second); - if (nReader == nullptr) { - LOG(ERROR) << "Bad format row"; - return nebula::cpp2::ErrorCode::E_INVALID_DATA; - } - } - - auto niks = indexKeys(partId, nReader.get(), e.first, index, schema.get()); - if (!niks.empty()) { - auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); - auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; - // Check the index is building for the specified partition or not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - for (auto& nik : niks) { - auto modifyOpKey = OperationKeyUtils::modifyOperationKey(partId, std::move(nik)); - batchHolder->put(std::move(modifyOpKey), std::string(niv)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - } else { - for (auto& nik : niks) { - batchHolder->put(std::move(nik), std::string(niv)); + // step 2, Insert new edge index + if (newReader != nullptr) { + auto newIndexKeys = indexKeys(partId, newReader.get(), key, index, nullptr); + if (!newIndexKeys.empty()) { + // check if index has ttl field, write it to index value if exists + auto field = CommonUtils::ttlValue(schema.get(), newReader.get()); + auto indexVal = field.ok() ? IndexKeyUtils::indexVal(std::move(field).value()) : ""; + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + for (auto& idxKey : newIndexKeys) { + auto opKey = OperationKeyUtils::modifyOperationKey(partId, idxKey); + ret.writeSet.push_back(opKey); + batchHolder->put(std::move(opKey), std::string(indexVal)); + } + } else if (env_->checkIndexLocked(indexState)) { + // return folly::Optional(); + return ret; + } else { + for (auto& idxKey : newIndexKeys) { + ret.writeSet.push_back(idxKey); + batchHolder->put(std::move(idxKey), std::string(indexVal)); + } + } } } } } } - /* - * step 3 , Insert new edge data - */ - auto key = e.first; - auto prop = e.second; - batchHolder->put(std::move(key), std::move(prop)); + // step 3, Insert new edge data + ret.writeSet.push_back(key); + // for why use a copy not move here: + // previously, we use atomicOp(a kind of raft log, raft send this log in sync) + // this import an implicit constraint + // that all atomicOp will execute only once + // (because all atomicOp may fail or succeed, won't retry) + // but in mergeable mode of atomic: + // an atomicOp may fail because of conflict + // then it will retry after the prev batch commit + // this mean now atomicOp may execute twice + // (won't be more than twice) + // but if we move the key out, + // then the second run will core. + batchHolder->put(std::string(key), std::string(value)); + } + + if (consistOp_) { + (*consistOp_)(*batchHolder, nullptr); } - return encodeBatchValue(batchHolder->getBatch()); + + ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; + ret.batch = encodeBatchValue(batchHolder->getBatch()); + return ret; } ErrorOr AddEdgesProcessor::findOldValue( diff --git a/src/storage/mutate/AddEdgesProcessor.h b/src/storage/mutate/AddEdgesProcessor.h index 0afa646a5c0..9a8f8ef649e 100644 --- a/src/storage/mutate/AddEdgesProcessor.h +++ b/src/storage/mutate/AddEdgesProcessor.h @@ -9,6 +9,7 @@ #include "common/base/Base.h" #include "common/stats/StatsManager.h" #include "kvstore/LogEncoder.h" +#include "kvstore/raftex/RaftPart.h" #include "storage/BaseProcessor.h" #include "storage/StorageFlags.h" @@ -37,8 +38,8 @@ class AddEdgesProcessor : public BaseProcessor { AddEdgesProcessor(StorageEnv* env, const ProcessorCounters* counters) : BaseProcessor(env, counters) {} - ErrorOr addEdges(PartitionID partId, - const std::vector& edges); + kvstore::MergeableAtomicOpResult addEdgesWithIndex(PartitionID partId, + std::vector&& data); ErrorOr findOldValue(PartitionID partId, const folly::StringPiece& rawKey); diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 83fcab649a3..dc0f3a97e31 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -137,18 +137,17 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); - for (auto& part : partVertices) { - IndexCountWrapper wrapper(env_); - std::unique_ptr batchHolder = std::make_unique(); + auto batchHolder = std::make_unique(); + + for (const auto& part : partVertices) { auto partId = part.first; const auto& vertices = part.second; - std::vector dummyLock; - dummyLock.reserve(vertices.size()); - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + std::vector kvs; + kvs.reserve(vertices.size()); - // cache tagKey + auto code = nebula::cpp2::ErrorCode::SUCCEEDED; deleteDupVid(const_cast&>(vertices)); - for (auto& vertex : vertices) { + for (const auto& vertex : vertices) { auto vid = vertex.get_id().getStr(); const auto& newTags = vertex.get_tags(); @@ -158,18 +157,10 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } + batchHolder->put(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), ""); - for (auto& newTag : newTags) { + for (const auto& newTag : newTags) { auto tagId = newTag.get_tag_id(); - auto l = std::make_tuple(spaceId_, partId, tagId, vid); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->verticesML_->try_lock(l)) { - LOG(ERROR) << folly::sformat("The vertex locked : tag {}, vid {}", tagId, vid); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId; auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId); @@ -180,134 +171,139 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re } auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vid, tagId); - auto props = newTag.get_props(); + // collect values + const auto& props = newTag.get_props(); auto iter = propNamesMap.find(tagId); std::vector propNames; if (iter != propNamesMap.end()) { propNames = iter->second; } - RowReaderWrapper nReader; - RowReaderWrapper oReader; - std::string oldVal; - if (!ignoreExistedIndex_) { - auto obsIdx = findOldValue(partId, vid, tagId); - if (nebula::ok(obsIdx)) { - if (ifNotExists_ && !nebula::value(obsIdx).empty()) { - continue; - } - if (!nebula::value(obsIdx).empty()) { - oldVal = std::move(value(obsIdx)); - oReader = - RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, oldVal); - } - } else { - code = nebula::error(obsIdx); - break; - } - } - - WriteResult wRet; - auto retEnc = encodeRowVal(schema.get(), propNames, props, wRet); - if (!retEnc.ok()) { - LOG(ERROR) << retEnc.status(); - code = writeResultTo(wRet, false); + WriteResult writeResult; + auto encode = encodeRowVal(schema.get(), propNames, props, writeResult); + if (!encode.ok()) { + LOG(ERROR) << encode.status(); + code = writeResultTo(writeResult, false); break; } + kvs.emplace_back(std::string(key), std::string(encode.value())); + } + } + + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleAsync(spaceId_, partId, code); + } else { + auto atomicOp = [&, partId, data = std::move(kvs)]() mutable { + return addVerticesWithIndex(partId, std::move(data), std::move(batchHolder)); + }; + + auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; + env_->kvstore_->asyncAtomicOp(spaceId_, partId, std::move(atomicOp), std::move(cb)); + } + } +} - if (!retEnc.value().empty()) { - nReader = - RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, retEnc.value()); +kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex( + PartitionID partId, + std::vector&& data, + std::unique_ptr&& batchHolder) { + kvstore::MergeableAtomicOpResult ret; + ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; + IndexCountWrapper wrapper(env_); + for (auto& [key, value] : data) { + auto vId = NebulaKeyUtils::getVertexId(spaceVidLen_, key); + auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); + RowReaderWrapper oldReader; + RowReaderWrapper newReader = + RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, value); + auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId); + if (!schema) { + ret.code = nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND; + DLOG(INFO) << "===>>> failed"; + return ret; + } + std::string oldVal; + if (!ignoreExistedIndex_) { + // read the old key value and initialize row reader if exists + auto result = findOldValue(partId, vId.str(), tagId); + if (nebula::ok(result)) { + if (ifNotExists_ && !nebula::value(result).empty()) { + continue; + } else if (!nebula::value(result).empty()) { + oldVal = std::move(nebula::value(result)); + oldReader = RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, oldVal); + ret.readSet.emplace_back(key); } - for (auto& index : indexes_) { - if (tagId == index->get_schema_id().get_tag_id()) { - auto indexFields = index->get_fields(); - /* - * step 1 , Delete old version index if exists. - */ - if (oReader != nullptr) { - auto ois = indexKeys(partId, vid, oReader.get(), index, schema.get()); - if (!ois.empty()) { - // Check the index is building for the specified partition or not - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); - for (auto& oi : ois) { - batchHolder->put(std::string(delOpKey), std::move(oi)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } else { - for (auto& oi : ois) { - batchHolder->remove(std::move(oi)); - } - } + } else { + // read old value failed + DLOG(INFO) << "===>>> failed"; + return ret; + } + } + for (const auto& index : indexes_) { + if (tagId == index->get_schema_id().get_tag_id()) { + // step 1, Delete old version index if exists. + if (oldReader != nullptr) { + auto oldIndexKeys = indexKeys(partId, vId.str(), oldReader.get(), index, schema.get()); + if (!oldIndexKeys.empty()) { + // Check the index is building for the specified partition or + // not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); + for (auto& idxKey : oldIndexKeys) { + ret.writeSet.emplace_back(std::string(delOpKey)); + batchHolder->put(std::string(delOpKey), std::move(idxKey)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + ret.code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + return ret; + } else { + for (auto& idxKey : oldIndexKeys) { + ret.writeSet.emplace_back(std::string(idxKey)); + batchHolder->remove(std::move(idxKey)); } } + } + } - /* - * step 2 , Insert new vertex index - */ - if (nReader != nullptr) { - auto niks = indexKeys(partId, vid, nReader.get(), index, schema.get()); - if (!niks.empty()) { - auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); - auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; - // Check the index is building for the specified partition or - // not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - for (auto& nik : niks) { - auto opKey = OperationKeyUtils::modifyOperationKey(partId, nik); - batchHolder->put(std::move(opKey), std::string(niv)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } else { - for (auto& nik : niks) { - batchHolder->put(std::move(nik), std::string(niv)); - } - } + // step 2, Insert new vertex index + if (newReader != nullptr) { + auto newIndexKeys = indexKeys(partId, vId.str(), newReader.get(), index, schema.get()); + if (!newIndexKeys.empty()) { + // check if index has ttl field, write it to index value if exists + auto field = CommonUtils::ttlValue(schema.get(), newReader.get()); + auto indexVal = field.ok() ? IndexKeyUtils::indexVal(std::move(field).value()) : ""; + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + for (auto& idxKey : newIndexKeys) { + auto opKey = OperationKeyUtils::modifyOperationKey(partId, idxKey); + ret.writeSet.emplace_back(std::string(opKey)); + batchHolder->put(std::move(opKey), std::string(indexVal)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + ret.code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + return ret; + } else { + for (auto& idxKey : newIndexKeys) { + ret.writeSet.emplace_back(std::string(idxKey)); + batchHolder->put(std::move(idxKey), std::string(indexVal)); } } } - } // for index data - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - break; } - /* - * step 3 , Insert new vertex data - */ - batchHolder->put(std::move(key), std::move(retEnc.value())); - stats::StatsManager::addValue(kNumVerticesInserted); } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - break; - } - } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->verticesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, code); - continue; } - auto batch = encodeBatchValue(batchHolder->getBatch()); - DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), false, false); - env_->kvstore_->asyncAppendBatch(spaceId_, - partId, - std::move(batch), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode retCode) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, retCode); - }); + // step 3, Insert new vertex data + ret.writeSet.emplace_back(key); + batchHolder->put(std::string(key), std::string(value)); } -} // namespace storage + ret.batch = encodeBatchValue(batchHolder->getBatch()); + ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; + return ret; +} ErrorOr AddVerticesProcessor::findOldValue( PartitionID partId, const VertexID& vId, TagID tagId) { diff --git a/src/storage/mutate/AddVerticesProcessor.h b/src/storage/mutate/AddVerticesProcessor.h index 71725c9b5be..851039fb396 100644 --- a/src/storage/mutate/AddVerticesProcessor.h +++ b/src/storage/mutate/AddVerticesProcessor.h @@ -46,6 +46,11 @@ class AddVerticesProcessor : public BaseProcessor { void deleteDupVid(std::vector& vertices); + kvstore::MergeableAtomicOpResult addVerticesWithIndex( + PartitionID partId, + std::vector&& data, + std::unique_ptr&& batchHolder); + private: GraphSpaceID spaceId_; std::vector> indexes_; diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 87d2dbec3ef..9730fc3d514 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -236,10 +236,14 @@ class MockKVStore : public ::nebula::kvstore::KVStore { } } - void asyncAtomicOp(GraphSpaceID, - PartitionID, - raftex::AtomicOp, - ::nebula::kvstore::KVCallback) override { + void asyncAtomicOp(GraphSpaceID spaceId, + PartitionID partId, + ::nebula::kvstore::MergeableAtomicOp op, + ::nebula::kvstore::KVCallback cb) override { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(cb); + UNUSED(op); LOG(FATAL) << "Unexpect"; } void asyncAppendBatch(GraphSpaceID, @@ -336,6 +340,7 @@ class MockIndexNode : public IndexNode { } std::unique_ptr copy() override { LOG(FATAL) << "Unexpect"; + return nullptr; } std::function nextFunc; std::function<::nebula::cpp2::ErrorCode(PartitionID)> executeFunc; diff --git a/src/storage/test/RebuildIndexTest.cpp b/src/storage/test/RebuildIndexTest.cpp index e2b9737545e..1f2d507194b 100644 --- a/src/storage/test/RebuildIndexTest.cpp +++ b/src/storage/test/RebuildIndexTest.cpp @@ -27,10 +27,6 @@ class RebuildIndexTest : public ::testing::Test { protected: static void SetUpTestCase() { LOG(INFO) << "SetUp RebuildIndexTest TestCase"; - rootPath_ = std::make_unique("/tmp/RebuildIndexTest.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - env_ = cluster_->storageEnv_.get(); manager_ = AdminTaskManager::instance(); manager_->init(); } @@ -38,13 +34,19 @@ class RebuildIndexTest : public ::testing::Test { static void TearDownTestCase() { LOG(INFO) << "TearDown RebuildIndexTest TestCase"; manager_->shutdown(); - cluster_.reset(); - rootPath_.reset(); } - void SetUp() override {} + void SetUp() override { + rootPath_ = std::make_unique("/tmp/RebuildIndexTest.XXXXXX"); + cluster_ = std::make_unique(); + cluster_->initStorageKV(rootPath_->path()); + env_ = cluster_->storageEnv_.get(); + } - void TearDown() override {} + void TearDown() override { + cluster_.reset(); + rootPath_.reset(); + } static StorageEnv* env_; static AdminTaskManager* manager_; @@ -91,7 +93,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexCheckALLData) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the data count @@ -177,7 +179,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexCheckALLData) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the data count @@ -277,7 +279,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithDelete) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); LOG(INFO) << "Check rebuild tag index..."; @@ -337,7 +339,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); LOG(INFO) << "Check rebuild tag index..."; @@ -349,7 +351,11 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); - sleep(1); + + for (int i = 1; i <= 5; ++i) { + LOG(INFO) << "sleep for " << i << "s"; + sleep(1); + } } TEST_F(RebuildIndexTest, RebuildTagIndex) { @@ -381,7 +387,8 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { // Wait for the task finished do { - usleep(50); + // usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -393,7 +400,10 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { } RebuildIndexTest::env_->rebuildIndexGuard_->clear(); - sleep(1); + for (int i = 1; i <= 5; ++i) { + LOG(INFO) << "sleep for " << i << "s"; + sleep(1); + } } TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { @@ -438,7 +448,8 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { // Wait for the task finished do { - usleep(50); + // usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -454,7 +465,11 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); - sleep(1); + // sleep(1); + for (int i = 1; i <= 5; ++i) { + LOG(INFO) << "sleep for " << i << "s"; + sleep(1); + } } TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { @@ -497,7 +512,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -511,6 +526,10 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); sleep(1); + for (int i = 1; i <= 5; ++i) { + LOG(INFO) << "sleep for " << i << "s"; + sleep(1); + } } TEST_F(RebuildIndexTest, RebuildEdgeIndex) { @@ -541,7 +560,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { // Wait for the task finished do { - usleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -551,6 +570,10 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { auto code = RebuildIndexTest::env_->kvstore_->get(1, key.first, key.second, &value); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); } + for (int i = 1; i <= 5; ++i) { + LOG(INFO) << "sleep for " << i << "s"; + sleep(1); + } } } // namespace storage diff --git a/src/storage/test/StatsTaskTest.cpp b/src/storage/test/StatsTaskTest.cpp index 803a04b4d08..34fe20f1257 100644 --- a/src/storage/test/StatsTaskTest.cpp +++ b/src/storage/test/StatsTaskTest.cpp @@ -264,7 +264,8 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { ASSERT_EQ(0, edge.second); } } - ASSERT_EQ(81, *statsItem.space_vertices_ref()); + // ASSERT_EQ(81, *statsItem.space_vertices_ref()); + EXPECT_EQ(81, *statsItem.space_vertices_ref()); ASSERT_EQ(167, *statsItem.space_edges_ref()); }