Skip to content

Commit

Permalink
Merge branch 'master' into haowen/kvsep
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu authored Nov 23, 2021
2 parents b8d2898 + bf0c3a1 commit 0064d29
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 74 deletions.
20 changes: 17 additions & 3 deletions src/common/plugin/fulltext/FTUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ struct HttpClient {
HostAddr host;
std::string user;
std::string password;
std::string connType{"http"};

HttpClient() = default;
~HttpClient() = default;

explicit HttpClient(HttpClient&& v) noexcept
: host(std::move(v.host)), user(std::move(v.user)), password(std::move(v.password)) {}
: host(std::move(v.host)),
user(std::move(v.user)),
password(std::move(v.password)),
connType(std::move(v.connType)) {}

explicit HttpClient(const HttpClient& v) noexcept
: host(v.host), user(v.user), password(v.password) {}
: host(v.host), user(v.user), password(v.password), connType(v.connType) {}

explicit HttpClient(HostAddr&& h) noexcept : host(std::move(h)) {}

Expand All @@ -58,10 +62,20 @@ struct HttpClient {
HttpClient(const HostAddr& h, const std::string& u, const std::string& p) noexcept
: host(h), user(u), password(p) {}

HttpClient(HostAddr&& h, std::string&& u, std::string&& p, std::string&& c) noexcept
: host(std::move(h)), user(std::move(u)), password(std::move(p)), connType(std::move(c)) {}

HttpClient(const HostAddr& h,
const std::string& u,
const std::string& p,
const std::string& c) noexcept
: host(h), user(u), password(p), connType(std::move(c)) {}

void clear() {
host.clear();
user.clear();
password.clear();
connType.clear();
}

std::string toString() const {
Expand All @@ -72,7 +86,7 @@ struct HttpClient {
os << ":" << password;
}
}
os << " \"http://" << host.host << ":" << host.port << "/";
os << " -k \"" << connType << "://" << host.host << ":" << host.port << "/";
return os.str();
}
};
Expand Down
12 changes: 6 additions & 6 deletions src/common/plugin/fulltext/test/FulltextPluginTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ TEST(FulltextPluginTest, ESIndexCheckTest) {
auto ret = ESGraphAdapter().indexExistsCmd(client, "test_index");
auto expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" "
"-XGET \"http://127.0.0.1:9200/_cat/indices/test_index?format=json\"";
"-XGET -k \"http://127.0.0.1:9200/_cat/indices/test_index?format=json\"";
ASSERT_EQ(expected, ret);
}

Expand All @@ -51,7 +51,7 @@ TEST(FulltextPluginTest, ESCreateIndexTest) {
auto ret = ESGraphAdapter().createIndexCmd(client, "test_index");
auto expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" "
"-XPUT \"http://127.0.0.1:9200/test_index\"";
"-XPUT -k \"http://127.0.0.1:9200/test_index\"";
ASSERT_EQ(expected, ret);
}

Expand All @@ -61,7 +61,7 @@ TEST(FulltextPluginTest, ESDropIndexTest) {
auto ret = ESGraphAdapter().dropIndexCmd(client, "test_index");
auto expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" "
"-XDELETE \"http://127.0.0.1:9200/test_index\"";
"-XDELETE -k \"http://127.0.0.1:9200/test_index\"";
ASSERT_EQ(expected, ret);
}

Expand All @@ -72,7 +72,7 @@ TEST(FulltextPluginTest, ESPutTest) {
auto header = ESStorageAdapter().putHeader(hc, item);
std::string expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" "
"-XPUT \"http://127.0.0.1:9200/index1/_doc/"
"-XPUT -k \"http://127.0.0.1:9200/index1/_doc/"
"00000000018c43de7b01bca674276c43e09b3ec5baYWFhYQ==\"";
ASSERT_EQ(expected, header);

Expand All @@ -97,7 +97,7 @@ TEST(FulltextPluginTest, ESBulkTest) {
auto header = ESStorageAdapter().bulkHeader(hc);
std::string expected =
"/usr/bin/curl -H \"Content-Type: application/x-ndjson; "
"charset=utf-8\" -XPOST \"http://127.0.0.1:9200/_bulk\"";
"charset=utf-8\" -XPOST -k \"http://127.0.0.1:9200/_bulk\"";
ASSERT_EQ(expected, header);

std::vector<folly::dynamic> bodies;
Expand Down Expand Up @@ -251,7 +251,7 @@ TEST(FulltextPluginTest, ESPrefixTest) {
auto header = ESGraphAdapter().header(client, item, limit);
std::string expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" "
"-XGET \"http://127.0.0.1:9200/index1/_search?timeout=10ms\"";
"-XGET -k \"http://127.0.0.1:9200/index1/_search?timeout=10ms\"";
ASSERT_EQ(expected, header);

auto body = ESGraphAdapter().prefixBody("aa");
Expand Down
7 changes: 5 additions & 2 deletions src/graph/executor/admin/ShowTSClientsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ folly::Future<Status> ShowTSClientsExecutor::showTSClients() {
return resp.status();
}
auto value = std::move(resp).value();
DataSet v({"Host", "Port"});
DataSet v({"Host", "Port", "Connection type"});
for (const auto &client : value) {
nebula::Row r({client.host.host, client.host.port});
nebula::Row r;
r.values.emplace_back(client.host.host);
r.values.emplace_back(client.host.port);
r.values.emplace_back(client.conn_type_ref().has_value() ? *client.get_conn_type() : "http");
v.emplace_back(std::move(r));
}
return finish(std::move(v));
Expand Down
1 change: 1 addition & 0 deletions src/graph/util/FTIndexUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ StatusOr<std::vector<nebula::plugin::HttpClient>> FTIndexUtils::getTSClients(
hc.user = *c.user_ref();
hc.password = *c.pwd_ref();
}
hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
tsClients.emplace_back(std::move(hc));
}
return tsClients;
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ struct FTClient {
1: required common.HostAddr host,
2: optional binary user,
3: optional binary pwd,
4: optional binary conn_type,
}

struct SignInFTServiceReq {
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ void ESListener::init() {
hc.user = *c.user_ref();
hc.password = *c.pwd_ref();
}
hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
esClients_.emplace_back(std::move(hc));
}

Expand Down
58 changes: 0 additions & 58 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,64 +755,6 @@ ErrorOr<nebula::cpp2::ErrorCode, HostAddr> DataBalanceJobExecutor::hostWithMinim
return nebula::cpp2::ErrorCode::E_NO_HOSTS;
}

nebula::cpp2::ErrorCode BalanceJobExecutor::collectZoneParts(const std::string& groupName,
HostParts& hostParts) {
auto groupKey = MetaKeyUtils::groupKey(groupName);
std::string groupValue;
auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get group " << groupName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

// zoneHosts use to record this host belong to zone's hosts
std::unordered_map<std::pair<HostAddr, std::string>, std::vector<HostAddr>> zoneHosts;
auto zoneNames = MetaKeyUtils::parseZoneNames(std::move(groupValue));
for (auto zoneName : zoneNames) {
auto zoneKey = MetaKeyUtils::zoneKey(zoneName);
std::string zoneValue;
retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get zone " << zoneName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

auto hosts = MetaKeyUtils::parseZoneHosts(std::move(zoneValue));
for (const auto& host : hosts) {
auto pair = std::pair<HostAddr, std::string>(std::move(host), zoneName);
auto& hs = zoneHosts[std::move(pair)];
hs.insert(hs.end(), hosts.begin(), hosts.end());
}
}

for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
auto host = it->first;
auto zoneIter =
std::find_if(zoneHosts.begin(), zoneHosts.end(), [host](const auto& pair) -> bool {
return host == pair.first.first;
});

if (zoneIter == zoneHosts.end()) {
LOG(INFO) << it->first << " have lost";
continue;
}

auto& hosts = zoneIter->second;
auto name = zoneIter->first.second;
for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) {
auto partIter = hostParts.find(*hostIter);
if (partIter == hostParts.end()) {
zoneParts_[it->first] = ZoneNameAndParts(name, std::vector<PartitionID>());
} else {
zoneParts_[it->first] = ZoneNameAndParts(name, partIter->second);
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

bool DataBalanceJobExecutor::checkZoneLegal(const HostAddr& source, const HostAddr& target) {
VLOG(3) << "Check " << source << " : " << target;
auto sourceIter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&source](const auto& pair) {
Expand Down
2 changes: 0 additions & 2 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class BalanceJobExecutor : public MetaJobExecutor {

nebula::cpp2::ErrorCode assembleZoneParts(const std::string& groupName, HostParts& hostParts);

nebula::cpp2::ErrorCode collectZoneParts(const std::string& groupName, HostParts& hostParts);

nebula::cpp2::ErrorCode save(const std::string& k, const std::string& v);

protected:
Expand Down
11 changes: 9 additions & 2 deletions src/parser/AdminSentences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ std::string SignInTextServiceSentence::toString() const {
buf += client.get_host().host;
buf += ":";
buf += std::to_string(client.get_host().port);
if (client.conn_type_ref().has_value()) {
std::string connType = *client.get_conn_type();
auto toupper = [](auto c) { return ::toupper(c); };
std::transform(connType.begin(), connType.end(), connType.begin(), toupper);
buf += ", ";
buf += connType;
}
if (client.user_ref().has_value() && !(*client.user_ref()).empty()) {
buf += ", \"";
buf += *client.get_user();
Expand All @@ -310,10 +317,10 @@ std::string SignInTextServiceSentence::toString() const {
buf += "\"";
}
buf += ")";
buf += ",";
buf += ", ";
}
if (!buf.empty()) {
buf.resize(buf.size() - 1);
buf.resize(buf.size() - 2);
}
return buf;
}
Expand Down
36 changes: 35 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_UNWIND KW_SKIP KW_OPTIONAL
%token KW_CASE KW_THEN KW_ELSE KW_END
%token KW_GROUP KW_ZONE KW_GROUPS KW_ZONES KW_INTO
%token KW_LISTENER KW_ELASTICSEARCH KW_FULLTEXT
%token KW_LISTENER KW_ELASTICSEARCH KW_FULLTEXT KW_HTTPS KW_HTTP
%token KW_AUTO KW_FUZZY KW_PREFIX KW_REGEXP KW_WILDCARD
%token KW_TEXT KW_SEARCH KW_CLIENTS KW_SIGN KW_SERVICE KW_TEXT_SEARCH
%token KW_ANY KW_SINGLE KW_NONE
Expand Down Expand Up @@ -532,6 +532,8 @@ unreserved_keyword
| KW_POINT { $$ = new std::string("point"); }
| KW_LINESTRING { $$ = new std::string("linestring"); }
| KW_POLYGON { $$ = new std::string("polygon"); }
| KW_HTTP { $$ = new std::string("http"); }
| KW_HTTPS { $$ = new std::string("https"); }
;

expression
Expand Down Expand Up @@ -1807,6 +1809,18 @@ text_search_client_item
$$->set_host(*$2);
delete $2;
}
| L_PAREN host_item COMMA KW_HTTP R_PAREN {
$$ = new nebula::meta::cpp2::FTClient();
$$->set_host(*$2);
$$->set_conn_type("http");
delete $2;
}
| L_PAREN host_item COMMA KW_HTTPS R_PAREN {
$$ = new nebula::meta::cpp2::FTClient();
$$->set_host(*$2);
$$->set_conn_type("https");
delete $2;
}
| L_PAREN host_item COMMA STRING COMMA STRING R_PAREN {
$$ = new nebula::meta::cpp2::FTClient();
$$->set_host(*$2);
Expand All @@ -1816,6 +1830,26 @@ text_search_client_item
delete $4;
delete $6;
}
| L_PAREN host_item COMMA KW_HTTP COMMA STRING COMMA STRING R_PAREN {
$$ = new nebula::meta::cpp2::FTClient();
$$->set_host(*$2);
$$->set_user(*$6);
$$->set_pwd(*$8);
$$->set_conn_type("http");
delete $2;
delete $6;
delete $8;
}
| L_PAREN host_item COMMA KW_HTTPS COMMA STRING COMMA STRING R_PAREN {
$$ = new nebula::meta::cpp2::FTClient();
$$->set_host(*$2);
$$->set_user(*$6);
$$->set_pwd(*$8);
$$->set_conn_type("https");
delete $2;
delete $6;
delete $8;
}
;

text_search_client_list
Expand Down
2 changes: 2 additions & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])
"INTO" { return TokenType::KW_INTO; }
"LISTENER" { return TokenType::KW_LISTENER; }
"ELASTICSEARCH" { return TokenType::KW_ELASTICSEARCH; }
"HTTP" { return TokenType::KW_HTTP; }
"HTTPS" { return TokenType::KW_HTTPS; }
"FULLTEXT" { return TokenType::KW_FULLTEXT; }
"AUTO" { return TokenType::KW_AUTO; }
"FUZZY" { return TokenType::KW_FUZZY; }
Expand Down
46 changes: 46 additions & 0 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2945,23 +2945,69 @@ TEST_F(ParserTest, FullTextServiceTest) {
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200)";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTP)";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTPS)";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTPS, \"user\")";
auto result = parse(query);
ASSERT_FALSE(result.ok());
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200), (127.0.0.1:9300)";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTPS), (127.0.0.1:9300)";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, \"user\", \"password\")";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTP, \"user\", \"password\")";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTPS, \"user\", \"password\")";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query =
"SIGN IN TEXT SERVICE (127.0.0.1:9200, \"user\", \"password\"), "
"(127.0.0.1:9200, \"user\", \"password\")";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query =
"SIGN IN TEXT SERVICE (127.0.0.1:9200, HTTP, \"user\", \"password\"), "
"(127.0.0.1:9200, HTTPS, \"user\", \"password\")";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), query);
}
{
std::string query = "SIGN OUT TEXT SERVICE";
Expand Down

0 comments on commit 0064d29

Please sign in to comment.