diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index d098a1ed76a4..a097c62be648 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -457,6 +457,54 @@ class TDatabaseResolver: public TActor endpoint = mdbEndpointGenerator->ToEndpoint(params); + return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls}; + }; + Parsers[NYql::EDatabaseType::MySQL] = []( + NJson::TJsonValue& databaseInfo, + const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, + bool useTls, + NConnector::NApi::EProtocol protocol + ) { + NYql::IMdbEndpointGenerator::TEndpoint endpoint; + TVector aliveHosts; + + const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe(); + + for (const auto& host : hostsArray) { + const auto& hostMap = host.GetMap(); + + if (!hostMap.contains("services")) { + // indicates that cluster is down + continue; + } + + // check if all services of a particular host are alive + const bool alive = std::all_of( + hostMap.at("services").GetArraySafe().begin(), + hostMap.at("services").GetArraySafe().end(), + [](const auto& service) { + return service["health"].GetString() == "ALIVE"; + } + ); + + if (alive) { + aliveHosts.push_back(host["name"].GetString()); + } + } + + if (aliveHosts.empty()) { + ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found"; + } + + NYql::IMdbEndpointGenerator::TParams params = { + .DatabaseType = NYql::EDatabaseType::MySQL, + .MdbHost = aliveHosts[std::rand() % static_cast(aliveHosts.size())], + .UseTls = useTls, + .Protocol = protocol, + }; + + endpoint = mdbEndpointGenerator->ToEndpoint(params); + return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls}; }; } @@ -538,7 +586,7 @@ class TDatabaseResolver: public TActor url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database") .AddUrlParam("databaseId", databaseId) .Build(); - } else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) { + } else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) { YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway"); url = TUrlBuilder( ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/") diff --git a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp index 2cfc32baa9c4..840a5f200c25 100644 --- a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp +++ b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp @@ -474,6 +474,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { issues ); } + Y_UNIT_TEST(Greenplum_MasterNode) { Test( NYql::EDatabaseType::Greenplum, @@ -505,7 +506,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { TString(""), true}, {}); - } + } Y_UNIT_TEST(Greenplum_PermissionDenied) { NYql::TIssues issues{ @@ -536,7 +537,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { )", NYql::TDatabaseResolverResponse::TDatabaseDescription{}, issues); - } + } + + Y_UNIT_TEST(MySQL) { + Test( + NYql::EDatabaseType::MySQL, + NYql::NConnector::NApi::EProtocol::NATIVE, + "https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts", + "200", + R"({ + "hosts": [ + { + "services": [ + { + "type": "POOLER", + "health": "ALIVE" + }, + { + "type": "MYSQL", + "health": "ALIVE" + } + ], + "name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net", + "clusterId": "c9qb2bjghs8onbncpamk", + "zoneId": "ru-central1-b", + "role": "MASTER", + "health": "ALIVE" + } + ] + })", + NYql::TDatabaseResolverResponse::TDatabaseDescription{ + TString{""}, + TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"}, + 3306, + TString(""), + true + }, + {}); + } + + Y_UNIT_TEST(MySQL_PermissionDenied) { + NYql::TIssues issues{ + NYql::TIssue( + TStringBuilder{} << MakeErrorPrefix( + "mdb.api.cloud.yandex.net:443", + "/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts", + "etn021us5r9rhld1vgbh", + NYql::EDatabaseType::MySQL + ) << NoPermissionStr + ) + }; + + Test( + NYql::EDatabaseType::MySQL, + NYql::NConnector::NApi::EProtocol::NATIVE, + "https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts", + "403", + R"( + { + "code": 7, + "message": "Permission denied", + "details": [ + { + "@type": "type.googleapis.com/google.rpc.RequestInfo", + "requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e" + } + ] + } + )", + NYql::TDatabaseResolverResponse::TDatabaseDescription{}, + issues + ); + } + Y_UNIT_TEST(DataStreams_PermissionDenied) { NYql::TIssues issues{ diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp index ada5a7709fc7..634d835070df 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp @@ -18,6 +18,8 @@ namespace NFq { constexpr ui32 GREENPLUM_PORT = 6432; + constexpr ui32 MYSQL_PORT = 3306; + // TMdbEndpointGeneratorLegacy implements behavior required by YQL legacy ClickHouse provider class TMdbEndpointGeneratorLegacy: public NYql::IMdbEndpointGenerator { TEndpoint ToEndpoint(const NYql::IMdbEndpointGenerator::TParams& params) const override { @@ -76,13 +78,21 @@ namespace NFq { ythrow yexception() << "Unexpected protocol for PostgreSQL " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol); } case NYql::EDatabaseType::Greenplum: - // https://cloud.yandex.ru/docs/managed-postgresql/operations/connect + // https://cloud.yandex.ru/docs/managed-greenplum/operations/connect switch (params.Protocol) { case NYql::NConnector::NApi::EProtocol::NATIVE: return TEndpoint(fixedHost, GREENPLUM_PORT); default: ythrow yexception() << "Unexpected protocol for Greenplum: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol); } + case NYql::EDatabaseType::MySQL: + // https://cloud.yandex.ru/docs/managed-mysql/operations/connect + switch (params.Protocol) { + case NYql::NConnector::NApi::EProtocol::NATIVE: + return TEndpoint(fixedHost, MYSQL_PORT); + default: + ythrow yexception() << "Unexpected protocol for MySQL: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol); + } default: ythrow yexception() << "Unexpected database type: " << ToString(params.DatabaseType); };