Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
V2.5.0 rebase master (#534)
Browse files Browse the repository at this point in the history
* fix task in recovered balance plan is still marked as FAILED (#528)

* fix task in recovered balance plan is still marked as FAILED

* fix bugs

* Fix db_dump print int vid (#533)

* fix db_dump print int vid

* fix --vids

* fix partId

* use new common

Co-authored-by: Doodle <[email protected]>
Co-authored-by: laura-ding <[email protected]>
  • Loading branch information
3 people authored Aug 2, 2021
1 parent 0dfa699 commit 137d24e
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 36 deletions.
4 changes: 4 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
if (it == peers.end()) {
return Status::PartNotFound();
}
if (peers.size() == 1 && peers.front() == leader) {
// if there is only one replica, skip transfer leader phase
return Status::OK();
}
auto target = dst;
if (dst == kRandomPeer) {
for (auto& p : peers) {
Expand Down
7 changes: 2 additions & 5 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ nebula::cpp2::ErrorCode Balancer::recovery() {
return recRet;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
// save the balance plan again because FAILED tasks would be marked as IN_PROGRESS again
return plan_->saveInStore();
}

nebula::cpp2::ErrorCode
Expand Down Expand Up @@ -303,10 +304,6 @@ Balancer::genTasks(GraphSpaceID spaceId,
}
}

if (confirmedHostParts.size() < 2) {
LOG(INFO) << "Too few hosts, no need for balance!";
return nebula::cpp2::ErrorCode::E_NO_VALID_HOST;
}
// 2. Make all hosts in confirmedHostParts balanced
if (balanceParts(plan_->id_, spaceId, confirmedHostParts, totalParts, tasks)) {
return tasks;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/partsMan/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::fillLeaders() {
}
auto it = std::find(activeHosts.begin(), activeHosts.end(), host);
if (it == activeHosts.end()) {
LOG(INFO) << "skip inactive host: " << host;
VLOG(1) << "skip inactive host: " << host;
continue; // skip inactive host
}

Expand All @@ -209,7 +209,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::fillLeaders() {
});

if (hostIt == hostItems_.end()) {
LOG(INFO) << "skip inactive host";
VLOG(1) << "skip inactive host";
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ DEFINE_int32(waiting_catch_up_retry_times, 30, "retry times when waiting for cat
DEFINE_int32(waiting_catch_up_interval_in_secs, 30,
"interval between two requests for catching up state");

DEFINE_int32(waiting_new_leader_retry_times, 30, "retry times when waiting for catching up data");
DEFINE_int32(waiting_new_leader_retry_times, 5, "retry times when waiting for new leader");

DEFINE_int32(waiting_new_leader_interval_in_secs, 5,
"interval between two requests for catching up state");
Expand Down
7 changes: 1 addition & 6 deletions src/storage/mutate/AddEdgesAtomicProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ void AddEdgesAtomicProcessor::processByChain(const cpp2::AddEdgesRequest& req) {
for (auto& part : *req.parts_ref()) {
auto localPart = part.first;
for (auto& edge : part.second) {
auto stPartId = env_->metaClient_->partId(spaceId_,
auto remotePart = env_->metaClient_->partId(spaceId_,
(*(*edge.key_ref()).dst_ref()).getStr());
if (!stPartId.ok()) {
failedPart[localPart] = nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
break;
}
auto remotePart = stPartId.value();
ChainId cid{localPart, remotePart};
if (FLAGS_trace_toss) {
auto& ekey = *edge.key_ref();
Expand Down
4 changes: 1 addition & 3 deletions src/storage/test/TossEnvironment.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,7 @@ struct TossEnvironment {

int32_t getPartId(const std::string& src) {
// auto stPart = mClient_->partId(spaceId_, edgeKey.src.getStr());
auto stPart = mClient_->partId(spaceId_, src);
LOG_IF(FATAL, !stPart.ok()) << "mClient_->partId failed";
return stPart.value();
return mClient_->partId(spaceId_, src);
}

/**
Expand Down
13 changes: 3 additions & 10 deletions src/storage/transaction/TransactionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,7 @@ TransactionManager::updateEdgeAtomic(size_t vIdLen,
PartitionID partId,
const cpp2::EdgeKey& edgeKey,
GetBatchFunc batchGetter) {
auto stRemotePart = env_->metaClient_->partId(spaceId, (*edgeKey.dst_ref()).getStr());
if (!stRemotePart.ok()) {
return folly::makeFuture(nebula::cpp2::ErrorCode::E_UNKNOWN);
}
auto remotePart = stRemotePart.value();
auto remotePart = env_->metaClient_->partId(spaceId, (*edgeKey.dst_ref()).getStr());
auto localKey = TransactionUtils::edgeKey(vIdLen, partId, edgeKey);

std::vector<KV> data{std::make_pair(localKey, "")};
Expand Down Expand Up @@ -265,11 +261,8 @@ TransactionManager::resumeTransaction(size_t vIdLen,
auto c = folly::makePromiseContract<nebula::cpp2::ErrorCode>();

auto dst = NebulaKeyUtils::getDstId(vIdLen, localKey);
auto stRemotePartId = env_->metaClient_->partId(spaceId, dst.str());
if (!stRemotePartId.ok()) {
return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
}
auto remoteKey = TransactionUtils::reverseRawKey(vIdLen, stRemotePartId.value(), localKey);
auto remotePartId = env_->metaClient_->partId(spaceId, dst.str());
auto remoteKey = TransactionUtils::reverseRawKey(vIdLen, remotePartId, localKey);

LOG_IF(INFO, FLAGS_trace_toss) << "try to get remote key=" << folly::hexlify(remoteKey)
<< ", according to lock=" << folly::hexlify(lockKey);
Expand Down
42 changes: 33 additions & 9 deletions src/tools/db-dump/DbDumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ Status DbDumper::initSpace() {
}
spaceVidLen_ = spaceVidLen.value();

auto vidTypeStatus = metaClient_->getSpaceVidType(spaceId_);
if (!vidTypeStatus) {
return vidTypeStatus.status();
}
spaceVidType_ = std::move(vidTypeStatus).value();

auto partNum = metaClient_->partsNum(spaceId_);
if (!partNum.ok()) {
return Status::Error("Get partition number from '%s' failed.", FLAGS_space_name.c_str());
Expand All @@ -94,7 +100,15 @@ Status DbDumper::initParams() {
std::vector<std::string> tags, edges;
try {
folly::splitTo<PartitionID>(',', FLAGS_parts, std::inserter(parts_, parts_.begin()), true);
folly::splitTo<VertexID>(',', FLAGS_vids, std::inserter(vids_, vids_.begin()), true);
if (spaceVidType_ == meta::cpp2::PropertyType::INT64) {
std::vector<int64_t> intVids;
folly::splitTo<int64_t>(',', FLAGS_vids, std::inserter(intVids, intVids.begin()), true);
for (auto vid : intVids) {
vids_.emplace(std::string(reinterpret_cast<const char*>(&vid), 8));
}
} else {
folly::splitTo<VertexID>(',', FLAGS_vids, std::inserter(vids_, vids_.begin()), true);
}
folly::splitTo<std::string>(',', FLAGS_tags, std::inserter(tags, tags.begin()), true);
folly::splitTo<std::string>(',', FLAGS_edges, std::inserter(edges, edges.begin()), true);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -209,7 +223,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid);
seek(prefix);
}
Expand All @@ -221,7 +235,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto edgeType : edgeTypes_) {
auto prefix = NebulaKeyUtils::edgePrefix(spaceVidLen_, partId, vid, edgeType);
seek(prefix);
Expand All @@ -235,7 +249,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto tagId : tagIds_) {
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid, tagId);
seek(prefix);
Expand All @@ -249,7 +263,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto edgeType : edgeTypes_) {
auto prefix = NebulaKeyUtils::edgePrefix(spaceVidLen_, partId, vid, edgeType);
seek(prefix);
Expand All @@ -260,7 +274,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto tagId : tagIds_) {
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid, tagId);
seek(prefix);
Expand Down Expand Up @@ -515,16 +529,16 @@ void DbDumper::iterates(kvstore::RocksPrefixIter* it) {

inline void DbDumper::printTagKey(const folly::StringPiece& key) {
auto part = NebulaKeyUtils::getPart(key);
auto vid = NebulaKeyUtils::getVertexId(spaceVidLen_, key);
auto vid = getVertexId(NebulaKeyUtils::getVertexId(spaceVidLen_, key));
auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key);
std::cout << "[vertex] key: " << part << ", " << vid << ", " << getTagName(tagId);
}

inline void DbDumper::printEdgeKey(const folly::StringPiece& key) {
auto part = NebulaKeyUtils::getPart(key);
auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key);
auto src = NebulaKeyUtils::getSrcId(spaceVidLen_, key);
auto dst = NebulaKeyUtils::getDstId(spaceVidLen_, key);
auto src = getVertexId(NebulaKeyUtils::getSrcId(spaceVidLen_, key));
auto dst = getVertexId(NebulaKeyUtils::getDstId(spaceVidLen_, key));
auto rank = NebulaKeyUtils::getRank(spaceVidLen_, key);
std::cout << "[edge] key: " << part << ", " << src << ", " << getEdgeName(edgeType) << ", "
<< rank << ", " << dst;
Expand Down Expand Up @@ -569,5 +583,15 @@ std::string DbDumper::getEdgeName(const EdgeType edgeType) {
return name.value();
}
}

Value DbDumper::getVertexId(const folly::StringPiece &vidStr) {
if (spaceVidType_ == meta::cpp2::PropertyType::INT64) {
int64_t val;
memcpy(reinterpret_cast<void*>(&val), vidStr.begin(), sizeof(int64_t));
return val;
} else {
return vidStr.str();
}
}
} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/tools/db-dump/DbDumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ class DbDumper {

bool isValidVidLen(VertexID vid);

Value getVertexId(const folly::StringPiece &vidStr);

private:
std::unique_ptr<rocksdb::DB> db_;
rocksdb::Options options_;
std::unique_ptr<meta::MetaClient> metaClient_;
std::unique_ptr<meta::ServerBasedSchemaManager> schemaMng_;
GraphSpaceID spaceId_;
int32_t spaceVidLen_;
meta::cpp2::PropertyType spaceVidType_;
int32_t partNum_;
std::unordered_set<PartitionID> parts_;
std::unordered_set<VertexID> vids_;
Expand Down

0 comments on commit 137d24e

Please sign in to comment.