Skip to content

Commit

Permalink
addresss some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Mar 21, 2022
1 parent 9c8da06 commit a53b365
Showing 1 changed file with 59 additions and 25 deletions.
84 changes: 59 additions & 25 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AppendLogsIterator final : public LogIterator {
for (auto it = logs_.begin(); it != logs_.end(); ++it) {
auto& promiseRef = std::get<4>(*it);
if (!promiseRef.isFulfilled()) {
DCHECK(!promiseRef.isFulfilled());
promiseRef.setValue(code);
}
}
Expand Down Expand Up @@ -145,9 +146,12 @@ class AppendLogsIterator final : public LogIterator {
class AppendLogsIteratorFactory {
public:
AppendLogsIteratorFactory() = default;
void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) {
static void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) {
DCHECK(sendLogs.empty());
std::unordered_set<std::string> memLock;
std::list<std::pair<std::string, std::string>> ranges;
for (auto& log : cacheLogs) {
auto code = mergeAble(log);
auto code = mergeAble(log, memLock, ranges);
if (code == MergeAbleCode::MERGE_BOTH) {
sendLogs.emplace_back();
std::swap(cacheLogs.front(), sendLogs.back());
Expand All @@ -158,7 +162,11 @@ class AppendLogsIteratorFactory {
std::swap(cacheLogs.front(), sendLogs.back());
cacheLogs.pop_front();
break;
} else { // NONE, or MERGE_NEXT
} else if (code == MergeAbleCode::NO_MERGE) {
// if we meet some failed atomicOp, we can just skip it.
cacheLogs.pop_front();
continue;
} else { // MERGE_NEXT
break;
}
}
Expand All @@ -169,7 +177,9 @@ class AppendLogsIteratorFactory {
*
* @param logWrapper
*/
MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper) {
static MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper,
std::unordered_set<std::string>& memLock,
std::list<std::pair<std::string, std::string>>& ranges) {
// log type:
switch (std::get<1>(logWrapper)) {
case LogType::NORMAL: {
Expand All @@ -178,9 +188,9 @@ class AppendLogsIteratorFactory {
if (log.empty()) {
return MergeAbleCode::MERGE_BOTH;
}
decode(log, updateSet, ranges_);
decode(log, updateSet, ranges);
for (auto& key : updateSet) {
memLock_.insert(key.str());
memLock.insert(key.str());
}
return MergeAbleCode::MERGE_BOTH;
}
Expand All @@ -197,23 +207,23 @@ class AppendLogsIteratorFactory {
if (!promiseRef.isFulfilled()) {
promiseRef.setValue(code);
}
return MergeAbleCode::MERGE_BOTH;
return MergeAbleCode::NO_MERGE;
}
std::get<2>(logWrapper) = std::move(result);
/**
* @brief We accept same read/write key in a one log,
* but reject if same in different logs.
*/
for (auto& key : read) {
auto cit = memLock_.find(key);
auto cit = memLock.find(key);
// read after write is not acceptable.
if (cit != memLock_.end()) {
if (cit != memLock.end()) {
return MergeAbleCode::MERGE_NEXT;
}

// if we try to read a key, in any range
// for (auto& [begin, end] : ranges_) {
for (auto& it : ranges_) {
// for (auto& [begin, end] : ranges) {
for (auto& it : ranges) {
auto* begin = it.first.c_str();
auto* end = it.second.c_str();
auto* pKey = key.c_str();
Expand All @@ -225,7 +235,7 @@ class AppendLogsIteratorFactory {

for (auto& key : write) {
// it doesn't matter if insert failed. (if write conflict, last write win)
memLock_.insert(key);
memLock.insert(key);
}
return MergeAbleCode::MERGE_BOTH;
}
Expand All @@ -235,9 +245,9 @@ class AppendLogsIteratorFactory {
return MergeAbleCode::NO_MERGE;
}

void decode(const std::string& log,
std::vector<folly::StringPiece>& updateSet,
std::list<std::pair<std::string, std::string>>& ranges) {
static void decode(const std::string& log,
std::vector<folly::StringPiece>& updateSet,
std::list<std::pair<std::string, std::string>>& ranges) {
switch (log[sizeof(int64_t)]) {
case nebula::kvstore::OP_PUT: {
auto pieces = nebula::kvstore::decodeMultiValues(log);
Expand Down Expand Up @@ -299,8 +309,8 @@ class AppendLogsIteratorFactory {
}

private:
std::unordered_set<std::string> memLock_;
std::list<std::pair<std::string, std::string>> ranges_;
// std::unordered_set<std::string> memLock;
// std::list<std::pair<std::string, std::string>> ranges;
};

/********************************************************
Expand Down Expand Up @@ -798,8 +808,10 @@ folly::Future<nebula::cpp2::ErrorCode> RaftPart::appendLogAsync(ClusterID source
// the partition stops
{
std::lock_guard<std::mutex> lck(logsLock_);
AppendLogsIteratorFactory fact;
fact.make(logs_, sendingLogs_);
AppendLogsIteratorFactory::make(logs_, sendingLogs_);
if (sendingLogs_.empty()) {
return retFuture;
}
}
AppendLogsIterator it(firstId, termId, std::move(sendingLogs_));
appendLogsInternal(std::move(it), termId);
Expand Down Expand Up @@ -858,7 +870,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
LogID committedId,
TermID prevLogTerm,
LogID prevLogId) {
LOG(INFO) << "RaftPart::replicateLogs()";
using namespace folly; // NOLINT since the fancy overload of | operator

decltype(hosts_) hosts;
Expand Down Expand Up @@ -1025,12 +1036,27 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
}

// at this monment, we have confidence logs should be succeeded replicated
// sendingPromise_.setOneSharedValue(nebula::cpp2::ErrorCode::SUCCEEDED);
{
std::lock_guard<std::mutex> lck(logsLock_);
CHECK(replicatingLogs_);
iter.commit();
replicatingLogs_ = false;
if (logs_.empty()) {
// no incoming during log replication
replicatingLogs_ = false;
VLOG(4) << idStr_ << "No more log to be replicated";
return;
} else {
// we have some new coming logs during replication
// need to send them also
AppendLogsIteratorFactory::make(logs_, sendingLogs_);
if (sendingLogs_.empty()) {
return;
}
auto firstId = lastLogId_ + 1;
AppendLogsIterator it(firstId, currTerm, std::move(sendingLogs_));
this->appendLogsInternal(std::move(it), currTerm);
}

return;
}
} else {
Expand Down Expand Up @@ -1998,11 +2024,19 @@ bool RaftPart::checkAppendLogResult(nebula::cpp2::ErrorCode res) {
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
{
std::lock_guard<std::mutex> lck(logsLock_);
auto setPromiseForLogs = [&](LogCache& logs) {
for (auto& log : logs) {
auto& promiseRef = std::get<4>(log);
if (!promiseRef.isFulfilled()) {
promiseRef.setValue(res);
}
}
};
setPromiseForLogs(logs_);
setPromiseForLogs(sendingLogs_);
logs_.clear();
// cachingPromise_.setValue(res);
// cachingPromise_.reset();
sendingLogs_.clear();
bufferOverFlow_ = false;
// sendingPromise_.setValue(res);
replicatingLogs_ = false;
}
return false;
Expand Down

0 comments on commit a53b365

Please sign in to comment.