Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YDB FQ: Support MDB MySQL in DatabaseResolver #6257

Merged
merged 13 commits into from
Jul 8, 2024
50 changes: 49 additions & 1 deletion ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,54 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::MySQL] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TVector<TString> aliveHosts;

const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();

for (const auto& host : hostsArray) {
const auto& hostMap = host.GetMap();

if (!hostMap.contains("services")) {
skywalker-jpg marked this conversation as resolved.
Show resolved Hide resolved
// indicates that cluster is down
continue;
}

// check if all services of a particular host are alive
const bool alive = std::all_of(
hostMap.at("services").GetArraySafe().begin(),
hostMap.at("services").GetArraySafe().end(),
[](const auto& service) {
return service["health"].GetString() == "ALIVE";
}
);

if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::MySQL,
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}

static constexpr char ActorName[] = "YQ_DATABASE_RESOLVER";
Expand Down Expand Up @@ -538,7 +586,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
Expand Down
73 changes: 73 additions & 0 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}
Y_UNIT_TEST(MySQL) {
Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"200",
R"({
"hosts": [
{
"services": [
{
"type": "POOLER",
"health": "ALIVE"
},
{
"type": "MYSQL",
"health": "ALIVE"
}
],
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
"clusterId": "c9qb2bjghs8onbncpamk",
"zoneId": "ru-central1-b",
"role": "MASTER",
"health": "ALIVE"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
3306,
TString(""),
true
},
{}
);
}

Y_UNIT_TEST(MySQL_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::MySQL
) << NoPermissionStr
)
};

Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
},
skywalker-jpg marked this conversation as resolved.
Show resolved Hide resolved
issues
);
}


Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace NFq {

constexpr ui32 GREENPLUM_PORT = 6432;

constexpr ui32 MYSQL_PORT = 3306;

// TMdbEndpointGeneratorLegacy implements behavior required by YQL legacy ClickHouse provider
class TMdbEndpointGeneratorLegacy: public NYql::IMdbEndpointGenerator {
TEndpoint ToEndpoint(const NYql::IMdbEndpointGenerator::TParams& params) const override {
Expand Down Expand Up @@ -76,13 +78,21 @@ namespace NFq {
ythrow yexception() << "Unexpected protocol for PostgreSQL " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
case NYql::EDatabaseType::Greenplum:
// https://cloud.yandex.ru/docs/managed-postgresql/operations/connect
// https://cloud.yandex.ru/docs/managed-greenplum/operations/connect
switch (params.Protocol) {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, GREENPLUM_PORT);
default:
ythrow yexception() << "Unexpected protocol for Greenplum: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
case NYql::EDatabaseType::MySQL:
// https://cloud.yandex.ru/docs/managed-mysql/operations/connect
switch (params.Protocol) {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, MYSQL_PORT);
default:
ythrow yexception() << "Unexpected protocol for MySQL: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
default:
ythrow yexception() << "Unexpected database type: " << ToString(params.DatabaseType);
};
Expand Down
Loading