Skip to content

Commit

Permalink
Cherry pick v3.1.0 (0422-0423)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
Sophie-Xie and liuyu85cn authored Apr 23, 2022
1 parent d0ad877 commit 33fd35e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
24 changes: 15 additions & 9 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,15 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) {
const auto& partVertices = req.get_parts();
const auto& propNamesMap = req.get_prop_names();
auto batchHolder = std::make_unique<kvstore::BatchHolder>();

for (const auto& part : partVertices) {
auto partId = part.first;
const auto& vertices = part.second;
std::vector<kvstore::KV> kvs;
kvs.reserve(vertices.size());
std::vector<kvstore::KV> tags;
tags.reserve(vertices.size());

std::vector<std::string> verticeData;
verticeData.reserve(vertices.size());

auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
deleteDupVid(const_cast<std::vector<cpp2::NewVertex>&>(vertices));
Expand All @@ -158,7 +160,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
break;
}

batchHolder->put(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), "");
verticeData.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid));
for (const auto& newTag : newTags) {
auto tagId = newTag.get_tag_id();
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId;
Expand Down Expand Up @@ -186,15 +188,15 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
code = writeResultTo(writeResult, false);
break;
}
kvs.emplace_back(std::string(key), std::string(encode.value()));
tags.emplace_back(std::string(key), std::string(encode.value()));
}
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleAsync(spaceId_, partId, code);
} else {
auto atomicOp = [&, partId, data = std::move(kvs)]() mutable {
return addVerticesWithIndex(partId, std::move(data), std::move(batchHolder));
auto atomicOp = [=, tags = std::move(tags), vertices = std::move(verticeData)]() mutable {
return addVerticesWithIndex(partId, tags, vertices);
};

auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); };
Expand All @@ -205,11 +207,15 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re

kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex(
PartitionID partId,
std::vector<kvstore::KV>&& data,
std::unique_ptr<kvstore::BatchHolder>&& batchHolder) {
const std::vector<kvstore::KV>& data,
const std::vector<std::string>& vertices) {
kvstore::MergeableAtomicOpResult ret;
ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED;
IndexCountWrapper wrapper(env_);
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& vertice : vertices) {
batchHolder->put(std::string(vertice), "");
}
for (auto& [key, value] : data) {
auto vId = NebulaKeyUtils::getVertexId(spaceVidLen_, key);
auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key);
Expand Down
7 changes: 3 additions & 4 deletions src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {

void deleteDupVid(std::vector<cpp2::NewVertex>& vertices);

kvstore::MergeableAtomicOpResult addVerticesWithIndex(
PartitionID partId,
std::vector<kvstore::KV>&& data,
std::unique_ptr<kvstore::BatchHolder>&& batchHolder);
kvstore::MergeableAtomicOpResult addVerticesWithIndex(PartitionID partId,
const std::vector<kvstore::KV>& data,
const std::vector<std::string>& vertices);

private:
GraphSpaceID spaceId_;
Expand Down

0 comments on commit 33fd35e

Please sign in to comment.