Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik committed Sep 12, 2024
1 parent 1eae50e commit cfd84af
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 13 deletions.
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Only external table supports parameters";
}

bool IsRDBMSDataSource(const TProtoStringType& sourceType) const {
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "Clickhouse"}, sourceType);
}

virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
NKikimrSchemeOp::TExternalDataSourceDescription proto;
if (!proto.ParseFromString(externalDataSourceDescription)) {
Expand All @@ -49,6 +53,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Unsupported property: " << key;
}

if (IsRDBMSDataSource(proto.GetSourceType()) && !proto.GetProperties().GetProperties().contains("database_name")){
ythrow TExternalSourceException() << proto.GetSourceType() << " source must provide database_name";
}

ValidateHostname(HostnamePatterns, proto.GetLocation());
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ void AddClustersFromConnections(
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
FillGenericClusterConfig(
common,
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
conn.content().setting().mysql_cluster(),
connectionName,
NYql::NConnector::NApi::EDataSourceKind::MYSQL,
authToken,
accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}

// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
Expand Down
56 changes: 53 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
// There are two kinds of managed YDBs: serverless and dedicated.
// While working with dedicated databases, we have to use underlay network.
// That's why we add `u-` prefix to database fqdn.
if (databaseInfo.GetMap().contains("dedicatedDatabase")) {
if (databaseInfo.GetMap().contains("storageConfig")) {
endpoint = "u-" + endpoint;
host = "u-" + host;
}
Expand All @@ -335,7 +335,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
{
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls, protocol);
// TODO: Take explicit field from MVP
bool isDedicatedDb = databaseInfo.GetMap().contains("dedicatedDatabase");
bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
Expand Down Expand Up @@ -457,6 +457,56 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::MySQL] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TVector<TString> aliveHosts;

const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();

for (const auto& host : hostsArray) {
const auto& hostMap = host.GetMap();

if (!hostMap.contains("services")) {
// indicates that cluster is down
continue;
}

const auto& servicesArray = hostMap.at("services").GetArraySafe();

// check if all services of a particular host are alive
const bool alive = std::all_of(
servicesArray.begin(),
servicesArray.end(),
[](const auto& service) {
return service["health"].GetString() == "ALIVE";
}
);

if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::MySQL,
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}
Expand Down Expand Up @@ -538,7 +588,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
Expand Down
81 changes: 77 additions & 4 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1gtl2kg13him37quoo6/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resuorcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -286,7 +286,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resourcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -474,6 +474,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
issues
);
}

Y_UNIT_TEST(Greenplum_MasterNode) {
Test(
NYql::EDatabaseType::Greenplum,
Expand Down Expand Up @@ -505,7 +506,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
TString(""),
true},
{});
}
}

Y_UNIT_TEST(Greenplum_PermissionDenied) {
NYql::TIssues issues{
Expand Down Expand Up @@ -536,7 +537,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}
}

Y_UNIT_TEST(MySQL) {
Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"200",
R"({
"hosts": [
{
"services": [
{
"type": "POOLER",
"health": "ALIVE"
},
{
"type": "MYSQL",
"health": "ALIVE"
}
],
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
"clusterId": "c9qb2bjghs8onbncpamk",
"zoneId": "ru-central1-b",
"role": "MASTER",
"health": "ALIVE"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
3306,
TString(""),
true
},
{});
}

Y_UNIT_TEST(MySQL_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::MySQL
) << NoPermissionStr
)
};

Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues
);
}


Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/fq/libs/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down Expand Up @@ -162,6 +165,8 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
return setting.postgresql_cluster().login();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().login();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().login();
}
}

Expand All @@ -183,6 +188,8 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
return setting.postgresql_cluster().password();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().password();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().password();
}
}

Expand All @@ -204,6 +211,8 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return GetBasicAuthMethod(setting.mysql_cluster().auth());
}
}

Expand All @@ -223,6 +232,8 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
return connection.content().setting().postgresql_cluster().auth();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return connection.content().setting().greenplum_cluster().auth();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return connection.content().setting().mysql_cluster().auth();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth{};
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class TComputeConfig {
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
case FederatedQuery::ConnectionSetting::kMysqlCluster:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,19 @@ TString MakeCreateExternalDataSourceQuery(
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});

}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
properties = fmt::format(
R"(
SOURCE_TYPE="MySQL",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
USE_TLS="{use_tls}"
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");

}
break;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/utils/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down
31 changes: 26 additions & 5 deletions ydb/core/fq/libs/control_plane_storage/request_validators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ void ValidateGenericConnectionSetting(
}

if (!connection.database_id() && !(connection.host() && connection.port())) {
auto msg = TStringBuilder() << "content.setting.clickhouse_cluster.{database_id or host,port} field is not specified";
issues.AddIssue( MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.{database_id or host,port} field is not specified";
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
}

if (!connection.database_name()) {
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.database_name field is not specified";
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
}

if (!connection.login()) {
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.login is not specified";
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, msg));
Expand Down Expand Up @@ -70,17 +75,33 @@ NYql::TIssues ValidateConnectionSetting(
break;
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
const FederatedQuery::GreenplumCluster database = setting.greenplum_cluster();
if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
const FederatedQuery::GreenplumCluster& greenplumCluster = setting.greenplum_cluster();

if (!greenplumCluster.has_auth() || greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.auth field is not specified"));
}

if (greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!greenplumCluster.database_id() && !greenplumCluster.database_name()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.{database_id or database_name} field is not specified"));
}
break;
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
const FederatedQuery::MySQLCluster database = setting.mysql_cluster();
if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.auth field is not specified"));
}

if (database.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!database.database_id() && !database.database_name()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.{database_id or database_name} field is not specified"));
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.{database_id or database_name} field is not specified"));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio
return setting.postgresql_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth::IDENTITY_NOT_SET;
}
Expand Down
Loading

0 comments on commit cfd84af

Please sign in to comment.