Skip to content

Commit

Permalink
Passed yt token
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Mar 7, 2024
1 parent 56cf18c commit cc6b356
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 53 deletions.
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
},
{
ToString(NYql::EDatabaseType::YT),
CreateExternalDataSource(TString{NYql::YtProviderName}, {"NONE"}, {}, hostnamePatternsRegEx)
CreateExternalDataSource(TString{NYql::YtProviderName}, {"TOKEN"}, {}, hostnamePatternsRegEx)
}
});
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);

return task;
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {


protected:
void FillSecureParamsFromStage(THashMap<TString, TString>& secureParams, const NKqpProto::TKqpPhyStage& stage) {
for (const auto& [secretName, authInfo] : stage.GetSecureParams()) {
const auto& structuredToken = NYql::CreateStructuredTokenParser(authInfo).ToBuilder().ReplaceReferences(SecureParams).ToJson();
const auto& structuredTokenParser = NYql::CreateStructuredTokenParser(structuredToken);
YQL_ENSURE(structuredTokenParser.HasIAMToken(), "only token authentification supported for compute tasks");
secureParams.emplace(secretName, structuredTokenParser.GetIAMToken());
}
}

void BuildSysViewScanTasks(TStageInfo& stageInfo) {
Y_DEBUG_ABORT_UNLESS(stageInfo.Meta.IsSysView());

Expand Down Expand Up @@ -873,6 +882,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
task.Meta.Type = TTaskMeta::TTaskType::Compute;

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);

LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
Expand Down Expand Up @@ -959,6 +969,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (structuredToken) {
task.Meta.SecureParams.emplace(sourceName, structuredToken);
}
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

if (resourceSnapshot.empty()) {
task.Meta.Type = TTaskMeta::TTaskType::Compute;
Expand Down Expand Up @@ -1051,6 +1062,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
YQL_ENSURE(!shardsResolved);
task.Meta.ShardId = taskLocation;
}
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

const auto& stageSource = stage.GetSources(0);
auto& input = task.Inputs[stageSource.GetInputIndex()];
Expand Down Expand Up @@ -1306,6 +1318,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.Type = TTaskMeta::TTaskType::Compute;
task.Meta.ExecuterId = SelfId();
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
Expand Down Expand Up @@ -1441,13 +1454,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
THashMap<ui64, ui64>& assignedShardsCount,
const bool sorted, const bool isOlapScan)
{
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui64 nodeId = ShardIdToNodeId.at(shardId);
if (stageInfo.Meta.IsOlap() && sorted) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
return task;
}

Expand All @@ -1459,6 +1474,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
tasks.push_back(task.Id);
++cnt;
return task;
Expand Down Expand Up @@ -1552,6 +1568,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.SetMetaId(metaGlueingId);
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
aws.SetAwsAccessKeyIdSecretName(GetOrEmpty(settings, "aws_access_key_id_secret_name"));
aws.SetAwsSecretAccessKeySecretName(GetOrEmpty(settings, "aws_secret_access_key_secret_name"));
aws.SetAwsRegion(GetOrEmpty(settings, "aws_region"));
} else if (authMethod == "TOKEN") {
auto& token = *externaDataSourceDesc.MutableAuth()->MutableToken();
token.SetTokenSecretName(GetOrEmpty(settings, "token_secret_name"));
} else {
ythrow yexception() << "Internal error. Unknown auth method: " << authMethod;
}
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour
externalDataSourceMetadata.Metadata->ExternalSource.AwsSecretAccessKey = objectDescription.SecretValues[1];
return;
}
case NKikimrSchemeOp::TAuth::kToken: {
if (objectDescription.SecretValues.size() != 1) {
SetError(externalDataSourceMetadata, TStringBuilder{} << "Token auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 1");
return;
}
externalDataSourceMetadata.Metadata->ExternalSource.Token = objectDescription.SecretValues[0];
return;
}
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: {
SetError(externalDataSourceMetadata, "identity case is not specified in case of update external data source secrets");
return;
Expand Down Expand Up @@ -494,6 +502,13 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSo
return promise.GetFuture();
}

case NKikimrSchemeOp::TAuth::kToken: {
const TString& tokenSecretId = authDescription.GetToken().GetTokenSecretName();
auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {tokenSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
return MakeFuture(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") }));
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ TString FillAuthProperties(THashMap<TString, TString>& properties, const TExtern
properties["awsRegion"] = externalSource.DataSourceAuth.GetAws().GetAwsRegion();
return {};

case NKikimrSchemeOp::TAuth::kToken:
properties["authMethod"] = "TOKEN";
properties["token"] = externalSource.Token;
properties["tokenReference"] = externalSource.DataSourceAuth.GetToken().GetTokenSecretName();
return {};

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
return {"Identity case is not specified"};
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ struct TExternalSource {
TString Password;
TString AwsAccessKeyId;
TString AwsSecretAccessKey;
TString Token;
NKikimrSchemeOp::TAuth DataSourceAuth;
NKikimrSchemeOp::TExternalDataSourceProperties Properties;
};
Expand Down
50 changes: 32 additions & 18 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,29 @@ void FillOlapProgram(const T& node, const NKikimr::NMiniKQL::TType* miniKqlResul
CompileOlapProgram(node.Process(), tableMeta, readProto, resultColNames, ctx);
}

THashMap<TString, TString> FindSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, TSet<TString>& SecretNames) {
THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(node, typesCtx, secureParams);

for (auto& [secretName, structuredToken] : secureParams) {
const auto& tokenParser = CreateStructuredTokenParser(structuredToken);
tokenParser.ListReferences(SecretNames);
structuredToken = tokenParser.ToBuilder().RemoveSecrets().ToJson();
}

return secureParams;
}

std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, const TString& nodeName, TSet<TString>& SecretNames) {
const auto& secureParams = FindSecureParams(node, typesCtx, SecretNames);
if (secureParams.empty()) {
return std::nullopt;
}

YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per " << nodeName << " allowed");
return *secureParams.begin();
}

class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
Expand Down Expand Up @@ -707,6 +730,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
return true;
});

const auto& secureParams = FindSecureParams(stage.Program().Ptr(), TypesCtx, SecretNames);
stageProto.MutableSecureParams()->insert(secureParams.begin(), secureParams.end());

auto result = stage.Program().Body();
auto resultType = result.Ref().GetTypeAnn();
ui32 outputsCount = 0;
Expand Down Expand Up @@ -976,15 +1002,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
externalSource.AddPartitionedTaskParams(partitionParam);
}

THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(source.Ptr(), TypesCtx, secureParams);
if (!secureParams.empty()) {
YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per source allowed");
auto it = secureParams.begin();
externalSource.SetSourceName(it->first);
auto token = it->second;
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
if (const auto& secureParams = FindOneSecureParam(source.Ptr(), TypesCtx, "source", SecretNames)) {
externalSource.SetSourceName(secureParams->first);
externalSource.SetAuthInfo(secureParams->second);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
Expand Down Expand Up @@ -1062,15 +1082,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node");
YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node");

THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(sink.Ptr(), TypesCtx, secureParams);
if (!secureParams.empty()) {
YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per sink allowed");
auto it = secureParams.begin();
externalSink.SetSinkName(it->first);
auto token = it->second;
externalSink.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
if (const auto& secureParams = FindOneSecureParam(sink.Ptr(), TypesCtx, "sink", SecretNames)) {
externalSink.SetSinkName(secureParams->first);
externalSink.SetAuthInfo(secureParams->second);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1863,13 +1863,18 @@ message TBasic {
optional string PasswordSecretName = 2;
}

message TToken {
optional string TokenSecretName = 2;
}

message TAuth {
oneof identity {
TNoneAuth None = 3;
TServiceAccountAuth ServiceAccount = 4;
TBasic Basic = 5;
TMdbBasic MdbBasic = 6;
TAws Aws = 7;
TToken Token = 8;
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ message TKqpPhyStage {
repeated TKqpSource Sources = 9;
bool IsSinglePartition = 10;
repeated TKqpSink Sinks = 11;
map<string, string> SecureParams = 12;
}

message TKqpPhyResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth,
return CheckAuth("BASIC", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kAws:
return CheckAuth("AWS", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kToken:
return CheckAuth("TOKEN", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kNone:
return CheckAuth("NONE", availableAuthMethods, errStr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::SetBasicAuthWithSecret(const T
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::SetTokenAuthWithSecret(const TString& tokenReference, const TString& token) {
Data.SetField("token_ref", tokenReference);
Data.SetField("token", token);
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::SetIAMToken(const TString& token) {
Data.SetField("token", token);
return *this;
Expand All @@ -56,12 +62,18 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const std::m
Data.ClearField("sa_id_signature_ref");
Data.SetField("sa_id_signature", secrets.at(reference));
}
if (Data.HasField("token_ref")) {
auto reference = Data.GetField("token_ref");
Data.ClearField("token_ref");
Data.SetField("token", secrets.at(reference));
}
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::RemoveSecrets() {
Data.ClearField("basic_password");
Data.ClearField("sa_id_signature");
Data.ClearField("token");
return *this;
}

Expand Down Expand Up @@ -127,6 +139,9 @@ void TStructuredTokenParser::ListReferences(TSet<TString>& references) const {
if (Data.HasField("sa_id_signature_ref")) {
references.insert(Data.GetField("sa_id_signature_ref"));
}
if (Data.HasField("token_ref")) {
references.insert(Data.GetField("token_ref"));
}
}

TStructuredTokenBuilder TStructuredTokenParser::ToBuilder() const {
Expand Down Expand Up @@ -178,4 +193,16 @@ TString ComposeStructuredTokenJsonForBasicAuthWithSecret(const TString& login, c
return result.ToJson();
}

TString ComposeStructuredTokenJsonForTokenAuthWithSecret(const TString& tokenSecretName, const TString& token) {
TStructuredTokenBuilder result;

if (tokenSecretName && token) {
result.SetTokenAuthWithSecret(tokenSecretName, token);
return result.ToJson();
}

result.SetNoAuth();
return result.ToJson();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TStructuredTokenBuilder {
TStructuredTokenBuilder& SetServiceAccountIdAuthWithSecret(const TString& accountId, const TString& accountIdSignatureReference, const TString& accountIdSignature);
TStructuredTokenBuilder& SetBasicAuth(const TString& login, const TString& password);
TStructuredTokenBuilder& SetBasicAuthWithSecret(const TString& login, const TString& passwordReference);
TStructuredTokenBuilder& SetTokenAuthWithSecret(const TString& tokenReference, const TString& token);
TStructuredTokenBuilder& SetIAMToken(const TString& token);
TStructuredTokenBuilder& SetNoAuth();
TStructuredTokenBuilder& ReplaceReferences(const std::map<TString, TString>& secrets);
Expand Down Expand Up @@ -51,4 +52,5 @@ TStructuredTokenParser CreateStructuredTokenParser(const TString& content);
TString ComposeStructuredTokenJsonForServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature, const TString& token);
TString ComposeStructuredTokenJsonForServiceAccountWithSecret(const TString& serviceAccountId, const TString& serviceAccountIdSignatureSecretName, const TString& serviceAccountIdSignature);
TString ComposeStructuredTokenJsonForBasicAuthWithSecret(const TString& login, const TString& passwordSecretName, const TString& password);
TString ComposeStructuredTokenJsonForTokenAuthWithSecret(const TString& tokenSecretName, const TString& token);
}
5 changes: 5 additions & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "yql_yt_dq_integration.h"
#include "yql_yt_dq_optimize.h"

#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/library/yql/providers/yt/common/yql_names.h>
Expand Down Expand Up @@ -95,11 +96,15 @@ class TYtDataSource : public TDataProviderBase {
}

void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override {
const TString& token = properties.Value("token", "");

State_->Configuration->AddValidCluster(name);
State_->Configuration->Tokens[name] = ComposeStructuredTokenJsonForTokenAuthWithSecret(properties.Value("tokenReference", ""), token);

TYtClusterConfig cluster;
cluster.SetName(name);
cluster.SetCluster(properties.Value("location", ""));
cluster.SetYTToken(token);
State_->Gateway->AddCluster(cluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ class TYtDqIntegration: public TDqIntegrationBase {
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (auto maybeYtReadTable = TMaybeNode<TYtReadTable>(read)) {
TMaybeNode<TCoSecureParam> secParams;
if (State_->Configuration->Auth.Get().GetOrElse(TString())) {
const auto cluster = maybeYtReadTable.Cast().DataSource().Cluster();
const auto cluster = maybeYtReadTable.Cast().DataSource().Cluster();
if (State_->Configuration->Auth.Get().GetOrElse(TString()) || State_->Configuration->Tokens.contains(cluster)) {
secParams = Build<TCoSecureParam>(ctx, read->Pos()).Name().Build(TString("cluster:default_").append(cluster)).Done();
}
return Build<TDqReadWrap>(ctx, read->Pos())
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/sql/v1/sql_translation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4453,14 +4453,16 @@ bool TSqlTranslation::ValidateAuthMethod(const std::map<TString, TDeferredAtom>&
"password_secret_name",
"aws_access_key_id_secret_name",
"aws_secret_access_key_secret_name",
"aws_region"
"aws_region",
"token_secret_name"
};
const static TMap<TStringBuf, TSet<TStringBuf>> authMethodFields{
{"NONE", {}},
{"SERVICE_ACCOUNT", {"service_account_id", "service_account_secret_name"}},
{"BASIC", {"login", "password_secret_name"}},
{"AWS", {"aws_access_key_id_secret_name", "aws_secret_access_key_secret_name", "aws_region"}},
{"MDB_BASIC", {"service_account_id", "service_account_secret_name", "login", "password_secret_name"}}
{"MDB_BASIC", {"service_account_id", "service_account_secret_name", "login", "password_secret_name"}},
{"TOKEN", {"token_secret_name"}}
};
auto authMethodIt = result.find("auth_method");
if (authMethodIt == result.end() || authMethodIt->second.GetLiteral() == nullptr) {
Expand Down
Loading

0 comments on commit cc6b356

Please sign in to comment.