Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Raft heartbeat process in event base #438

Merged
merged 8 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class KVEngine {

virtual std::unique_ptr<WriteBatch> startBatchWrite() = 0;

virtual nebula::cpp2::ErrorCode
commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL = true,
bool sync = false) = 0;
virtual nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL,
bool sync,
bool wait) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ bool Listener::preProcessLog(LogID logId,
return true;
}

bool Listener::commitLogs(std::unique_ptr<LogIterator> iter) {
cpp2::ErrorCode Listener::commitLogs(std::unique_ptr<LogIterator> iter, bool) {
LogID lastId = -1;
while (iter->valid()) {
lastId = iter->logId();
Expand All @@ -122,7 +122,7 @@ bool Listener::commitLogs(std::unique_ptr<LogIterator> iter) {
if (lastId > 0) {
leaderCommitId_ = lastId;
}
return true;
return cpp2::ErrorCode::SUCCEEDED;
}

void Listener::doApply() {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ derived class.

// For listener, we just return true directly. Another background thread trigger the actual
// apply work, and do it in worker thread, and update lastApplyLogId_
bool commitLogs(std::unique_ptr<LogIterator> iter)
cpp2::Errorcode commitLogs(std::unique_ptr<LogIterator> iter, bool)

// For most of the listeners, just return true is enough. However, if listener need to be aware
// of membership change, some log type of wal need to be pre-processed, could do it here.
Expand Down Expand Up @@ -154,7 +154,7 @@ class Listener : public raftex::RaftPart {

// For listener, we just return true directly. Another background thread trigger the actual
// apply work, and do it in worker thread, and update lastApplyLogId_
bool commitLogs(std::unique_ptr<LogIterator>) override;
cpp2::ErrorCode commitLogs(std::unique_ptr<LogIterator>, bool) override;

// For most of the listeners, just return true is enough. However, if listener need to be aware
// of membership change, some log type of wal need to be pre-processed, could do it here.
Expand Down
18 changes: 10 additions & 8 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,17 @@ void NebulaStore::checkRemoteListeners(GraphSpaceID spaceId,
void NebulaStore::updateSpaceOption(GraphSpaceID spaceId,
const std::unordered_map<std::string, std::string>& options,
bool isDbOption) {
if (isDbOption) {
for (const auto& kv : options) {
setDBOption(spaceId, kv.first, kv.second);
}
} else {
for (const auto& kv : options) {
setOption(spaceId, kv.first, kv.second);
storeWorker_->addTask([this, spaceId, opts = options, isDbOption] {
if (isDbOption) {
for (const auto& kv : opts) {
setDBOption(spaceId, kv.first, kv.second);
}
} else {
for (const auto& kv : opts) {
setOption(spaceId, kv.first, kv.second);
}
}
}
});
}

void NebulaStore::removeSpaceDir(const std::string& dir) {
Expand Down
54 changes: 35 additions & 19 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) {
}
}

bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
auto batch = engine_->startBatchWrite();
LogID lastId = -1;
TermID lastTerm = -1;
Expand All @@ -214,9 +214,10 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
if (batch->put(pieces[0], pieces[1]) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = batch->put(pieces[0], pieces[1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
return code;
}
break;
}
Expand All @@ -225,37 +226,41 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
if (batch->put(kvs[i], kvs[i + 1]) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
return code;
}
}
break;
}
case OP_REMOVE: {
auto key = decodeSingleValue(log);
if (batch->remove(key) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = batch->remove(key);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
return code;
}
break;
}
case OP_MULTI_REMOVE: {
auto keys = decodeMultiValues(log);
for (auto k : keys) {
if (batch->remove(k) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = batch->remove(k);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
return code;
}
}
break;
}
case OP_REMOVE_RANGE: {
auto range = decodeMultiValues(log);
DCHECK_EQ(2, range.size());
if (batch->removeRange(range[0], range[1]) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = batch->removeRange(range[0], range[1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removeRange()";
return false;
return code;
}
break;
}
Expand All @@ -272,7 +277,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch";
return false;
return code;
}
}
break;
Expand Down Expand Up @@ -314,14 +319,14 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}

if (lastId >= 0) {
if (putCommitMsg(batch.get(), lastId, lastTerm) != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = putCommitMsg(batch.get(), lastId, lastTerm);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Commit msg failed";
return false;
return code;
}
}
return engine_->commitBatchWrite(std::move(batch),
FLAGS_rocksdb_disable_wal,
FLAGS_rocksdb_wal_sync) == nebula::cpp2::ErrorCode::SUCCEEDED;
return engine_->commitBatchWrite(
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, wait);
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
Expand All @@ -348,7 +353,9 @@ std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>&
}
}
// For snapshot, we open the rocksdb's wal to avoid loss data if crash.
if (nebula::cpp2::ErrorCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch), false)) {
auto code = engine_->commitBatchWrite(
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
Expand Down Expand Up @@ -433,6 +440,16 @@ bool Part::preProcessLog(LogID logId,
return true;
}

void Part::cleanup() {
LOG(INFO) << idStr_ << "Clean rocksdb commit key";
auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error "
<< static_cast<int32_t>(res);
}
return;
}

// TODO(pandasheep) unify raft errorcode
nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) {
switch (res) {
Expand All @@ -445,7 +462,6 @@ nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) {
case raftex::AppendLogResult::E_ATOMIC_OP_FAILURE:
return nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED;
case raftex::AppendLogResult::E_BUFFER_OVERFLOW:
LOG_EVERY_N(ERROR, 100) << idStr_ << "RaftPart buffer is full";
return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR;
default:
LOG(ERROR) << idStr_ << "Consensus error "
Expand Down
12 changes: 2 additions & 10 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class Part : public raftex::RaftPart {

void onDiscoverNewLeader(HostAddr nLeader) override;

bool commitLogs(std::unique_ptr<LogIterator> iter) override;
cpp2::ErrorCode commitLogs(std::unique_ptr<LogIterator> iter, bool wait) override;

bool preProcessLog(LogID logId,
TermID termId,
Expand All @@ -110,15 +110,7 @@ class Part : public raftex::RaftPart {
nebula::cpp2::ErrorCode
putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm);

void cleanup() override {
LOG(INFO) << idStr_ << "Clean rocksdb commit key";
auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error "
<< static_cast<int32_t>(res);
}
return;
}
void cleanup() override;

nebula::cpp2::ErrorCode toResultCode(raftex::AppendLogResult res);

Expand Down
6 changes: 5 additions & 1 deletion src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,18 @@ std::unique_ptr<WriteBatch> RocksEngine::startBatchWrite() {
nebula::cpp2::ErrorCode
RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL,
bool sync) {
bool sync,
bool wait) {
rocksdb::WriteOptions options;
options.disableWAL = disableWAL;
options.sync = sync;
options.no_slowdown = !wait;
auto* b = static_cast<RocksWriteBatch*>(batch.get());
rocksdb::Status status = db_->Write(options, b->data());
if (status.ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else if (!wait && status.IsIncomplete()) {
return nebula::cpp2::ErrorCode::E_WRITE_STALLED;
}
LOG(ERROR) << "Write into rocksdb failed because of " << status.ToString();
return nebula::cpp2::ErrorCode::E_UNKNOWN;
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class RocksEngine : public KVEngine {
nebula::cpp2::ErrorCode
commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL,
bool sync) override;
bool sync,
bool wait) override;

/*********************
* Data retrieval
Expand Down
Loading