Skip to content

Commit

Permalink
Merge branch 'master' into http2
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Jan 25, 2022
2 parents 8ef01e1 + be4b679 commit aa2326e
Show file tree
Hide file tree
Showing 212 changed files with 2,473 additions and 1,777 deletions.
6 changes: 6 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#Require an approved review in PRs including files with a designated code owner.
/conf/ @vesoft-inc/tech-committee-reviewers
/src/kvstore/raftex/ @critical27 @sherman-the-tank
/cmake/ @sherman-the-tank @yixinglu
*.thrift @vesoft-inc/tech-committee-reviewers
*.yy @CPWstatic
2 changes: 1 addition & 1 deletion cmake/ThriftGenerate.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ add_custom_command(
--gen "js:node:"
--gen "csharp"
--gen "java:hashcode"
--gen "go:thrift_import=github.com/facebook/fbthrift/thrift/lib/go/thrift,package_prefix=github.com/vesoft-inc/nebula-go/v2/,use_context"
--gen "go:thrift_import=github.com/facebook/fbthrift/thrift/lib/go/thrift,package_prefix=github.com/vesoft-inc/nebula-go/v3/,use_context"
-o "." "${file_path}/${file_name}.thrift"
COMMAND
mkdir -p "./gen-rust/${file_name}"
Expand Down
212 changes: 12 additions & 200 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ bool MetaClient::loadData() {
GraphSpaceID spaceId = spaceInfo.first;
std::shared_ptr<SpaceInfoCache> info = spaceInfo.second;
std::shared_ptr<SpaceInfoCache> infoDeepCopy = std::make_shared<SpaceInfoCache>(*info);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_);
infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_);
infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_);
newMetaData->localCache_[spaceId] = infoDeepCopy;
Expand Down Expand Up @@ -395,14 +395,14 @@ bool MetaClient::loadData() {
return true;
}

TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool) {
TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec) {
TagSchemas tagSchemas;
TagID lastTagId = -1;
for (auto& tagIt : tagItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(tagIt.get_version());
for (const auto& colIt : tagIt.get_schema().get_columns()) {
addSchemaField(schema.get(), colIt, pool);
addSchemaField(schema.get(), colIt);
}
// handle schema property
schema->setProp(tagIt.get_schema().get_schema_prop());
Expand All @@ -416,16 +416,15 @@ TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, Ob
return tagSchemas;
}

EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec,
ObjectPool* pool) {
EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec) {
EdgeSchemas edgeSchemas;
std::unordered_set<std::pair<GraphSpaceID, EdgeType>> edges;
EdgeType lastEdgeType = -1;
for (auto& edgeIt : edgeItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(edgeIt.get_version());
for (const auto& col : edgeIt.get_schema().get_columns()) {
MetaClient::addSchemaField(schema.get(), col, pool);
MetaClient::addSchemaField(schema.get(), col);
}
// handle shcem property
schema->setProp(edgeIt.get_schema().get_schema_prop());
Expand All @@ -439,32 +438,19 @@ EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec
return edgeSchemas;
}

void MetaClient::addSchemaField(NebulaSchemaProvider* schema,
const cpp2::ColumnDef& col,
ObjectPool* pool) {
void MetaClient::addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col) {
bool hasDef = col.default_value_ref().has_value();
auto& colType = col.get_type();
size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0;
cpp2::GeoShape geoShape =
colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY;
bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false;
Expression* defaultValueExpr = nullptr;
std::string encoded;
if (hasDef) {
auto encoded = *col.get_default_value();
defaultValueExpr = Expression::decode(pool, folly::StringPiece(encoded.data(), encoded.size()));

if (defaultValueExpr == nullptr) {
LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name();
hasDef = false;
}
encoded = *col.get_default_value();
}

schema->addField(col.get_name(),
colType.get_type(),
len,
nullable,
hasDef ? defaultValueExpr : nullptr,
geoShape);
schema->addField(col.get_name(), colType.get_type(), len, nullable, encoded, geoShape);
}

bool MetaClient::loadSchemas(GraphSpaceID spaceId,
Expand Down Expand Up @@ -492,9 +478,9 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId,
auto edgeItemVec = edgeRet.value();
allEdgeMap[spaceId] = {};
spaceInfoCache->tagItemVec_ = tagItemVec;
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec, &spaceInfoCache->pool_);
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec);
spaceInfoCache->edgeItemVec_ = edgeItemVec;
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec, &spaceInfoCache->pool_);
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec);

for (auto& tagIt : tagItemVec) {
tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.get_tag_name()), tagIt.get_tag_id());
Expand Down Expand Up @@ -864,8 +850,6 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Zone is empty!");
case nebula::cpp2::ErrorCode::E_STORE_FAILURE:
return Status::Error("Store failure!");
case nebula::cpp2::ErrorCode::E_STORE_SEGMENT_ILLEGAL:
return Status::Error("Store segment illegal!");
case nebula::cpp2::ErrorCode::E_BAD_BALANCE_PLAN:
return Status::Error("Bad balance plan!");
case nebula::cpp2::ErrorCode::E_BALANCED:
Expand Down Expand Up @@ -1419,134 +1403,6 @@ StatusOr<std::vector<std::string>> MetaClient::getAllEdgeFromCache(const GraphSp
return it->second;
}

folly::Future<StatusOr<bool>> MetaClient::multiPut(
std::string segment, std::vector<std::pair<std::string, std::string>> pairs) {
if (!nebula::meta::checkSegment(segment) || pairs.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::MultiPutReq req;
std::vector<nebula::KeyValue> data;
data.reserve(pairs.size());

for (auto& element : pairs) {
data.emplace_back(std::move(element));
}
req.segment_ref() = std::move(segment);
req.pairs_ref() = std::move(data);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_multiPut(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::string>> MetaClient::get(std::string segment, std::string key) {
if (!nebula::meta::checkSegment(segment) || key.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::GetReq req;
req.segment_ref() = std::move(segment);
req.key_ref() = std::move(key);
folly::Promise<StatusOr<std::string>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_get(request); },
[](cpp2::GetResp&& resp) -> std::string { return resp.get_value(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<std::string>>> MetaClient::multiGet(
std::string segment, std::vector<std::string> keys) {
if (!nebula::meta::checkSegment(segment) || keys.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::MultiGetReq req;
req.segment_ref() = std::move(segment);
req.keys_ref() = std::move(keys);
folly::Promise<StatusOr<std::vector<std::string>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_multiGet(request); },
[](cpp2::MultiGetResp&& resp) -> std::vector<std::string> { return resp.get_values(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<std::string>>> MetaClient::scan(std::string segment,
std::string start,
std::string end) {
if (!nebula::meta::checkSegment(segment) || start.empty() || end.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::ScanReq req;
req.segment_ref() = std::move(segment);
req.start_ref() = std::move(start);
req.end_ref() = std::move(end);
folly::Promise<StatusOr<std::vector<std::string>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_scan(request); },
[](cpp2::ScanResp&& resp) -> std::vector<std::string> { return resp.get_values(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::remove(std::string segment, std::string key) {
if (!nebula::meta::checkSegment(segment) || key.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::RemoveReq req;
req.segment_ref() = std::move(segment);
req.key_ref() = std::move(key);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_remove(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::removeRange(std::string segment,
std::string start,
std::string end) {
if (!nebula::meta::checkSegment(segment) || start.empty() || end.empty()) {
return Status::Error("arguments invalid!");
}

cpp2::RemoveRangeReq req;
req.segment_ref() = std::move(segment);
req.start_ref() = std::move(start);
req.end_ref() = std::move(end);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_removeRange(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) {
folly::rcu_reader guard;
const auto& metadata = *metadata_.load();
Expand Down Expand Up @@ -2790,50 +2646,6 @@ folly::Future<StatusOr<std::vector<cpp2::RoleItem>>> MetaClient::getUserRoles(st
return future;
}

folly::Future<StatusOr<std::string>> MetaClient::getTagDefaultValue(GraphSpaceID spaceId,
TagID tagId,
const std::string& field) {
cpp2::GetReq req;
static std::string defaultKey = "__default__";
req.segment_ref() = defaultKey;
std::string key;
key.reserve(64);
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
key.append(reinterpret_cast<const char*>(&tagId), sizeof(TagID));
key.append(field);
req.key_ref() = std::move(key);
folly::Promise<StatusOr<std::string>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_get(request); },
[](cpp2::GetResp&& resp) -> std::string { return resp.get_value(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<std::string>> MetaClient::getEdgeDefaultValue(GraphSpaceID spaceId,
EdgeType edgeType,
const std::string& field) {
cpp2::GetReq req;
static std::string defaultKey = "__default__";
req.segment_ref() = defaultKey;
std::string key;
key.reserve(64);
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
key.append(reinterpret_cast<const char*>(&edgeType), sizeof(EdgeType));
key.append(field);
req.key_ref() = std::move(key);
folly::Promise<StatusOr<std::string>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_get(request); },
[](cpp2::GetResp&& resp) -> std::string { return resp.get_value(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::regConfig(const std::vector<cpp2::ConfigItem>& items) {
cpp2::RegConfigReq req;
req.items_ref() = items;
Expand Down
35 changes: 3 additions & 32 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ struct SpaceInfoCache {
std::vector<cpp2::IndexItem> edgeIndexItemVec_;
Indexes edgeIndexes_;
Listeners listeners_;
// objPool used to decode when adding field
ObjectPool pool_;
std::unordered_map<PartitionID, TermID> termOfPartition_;

SpaceInfoCache() = default;
Expand Down Expand Up @@ -373,25 +371,6 @@ class MetaClient {

folly::Future<StatusOr<std::vector<cpp2::IndexStatus>>> listEdgeIndexStatus(GraphSpaceID spaceId);

// Operations for custom kv
folly::Future<StatusOr<bool>> multiPut(std::string segment,
std::vector<std::pair<std::string, std::string>> pairs);

folly::Future<StatusOr<std::string>> get(std::string segment, std::string key);

folly::Future<StatusOr<std::vector<std::string>>> multiGet(std::string segment,
std::vector<std::string> keys);

folly::Future<StatusOr<std::vector<std::string>>> scan(std::string segment,
std::string start,
std::string end);

folly::Future<StatusOr<bool>> remove(std::string segment, std::string key);

folly::Future<StatusOr<bool>> removeRange(std::string segment,
std::string start,
std::string end);

// Operations for users.
folly::Future<StatusOr<bool>> createUser(std::string account,
std::string password,
Expand Down Expand Up @@ -591,14 +570,6 @@ class MetaClient {

const std::vector<HostAddr>& getAddresses();

folly::Future<StatusOr<std::string>> getTagDefaultValue(GraphSpaceID spaceId,
TagID tagId,
const std::string& field);

folly::Future<StatusOr<std::string>> getEdgeDefaultValue(GraphSpaceID spaceId,
EdgeType edgeType,
const std::string& field);

std::vector<cpp2::RoleItem> getRolesByUserFromCache(const std::string& user);

Status authCheckFromCache(const std::string& account, const std::string& password);
Expand Down Expand Up @@ -816,10 +787,10 @@ class MetaClient {
ServiceClientsList serviceClientList_;
};

void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool);
void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col);

TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec, ObjectPool* pool);
TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec);

std::unique_ptr<thread::GenericWorker> bgThread_;
SpaceNameIdMap spaceIndexByName_;
Expand Down
2 changes: 1 addition & 1 deletion src/codec/RowReaderV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RowReaderV2 : public RowReader {
FRIEND_TEST(ScanEdgePropBench, ProcessEdgeProps);

public:
virtual ~RowReaderV2() = default;
~RowReaderV2() override = default;

Value getValueByName(const std::string& prop) const noexcept override;
Value getValueByIndex(const int64_t index) const noexcept override;
Expand Down
6 changes: 4 additions & 2 deletions src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,9 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {

WriteResult r = WriteResult::SUCCEEDED;
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
ObjectPool pool;
auto& exprStr = field->defaultValue();
auto expr = Expression::decode(&pool, folly::StringPiece(exprStr.data(), exprStr.size()));
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::NULLVALUE:
Expand Down Expand Up @@ -851,7 +853,7 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
<< ", default value expr: " << field->defaultValue()->toString();
<< ", default value expr: " << field->defaultValue();
}
} else {
// Set NULL
Expand Down
Loading

0 comments on commit aa2326e

Please sign in to comment.