Skip to content

Commit

Permalink
Merge 61d79b1 into 24cc689
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored Sep 27, 2024
2 parents 24cc689 + 61d79b1 commit 142e642
Show file tree
Hide file tree
Showing 31 changed files with 276 additions and 78 deletions.
19 changes: 11 additions & 8 deletions ydb/core/kqp/gateway/behaviour/view/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/path.h>
#include <ydb/core/kqp/gateway/actors/scheme.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

namespace NKikimr::NKqp {
Expand All @@ -11,13 +12,14 @@ namespace {

using TYqlConclusionStatus = TViewManager::TYqlConclusionStatus;
using TInternalModificationContext = TViewManager::TInternalModificationContext;
using TExternalModificationContext = TViewManager::TExternalModificationContext;

TString GetByKeyOrDefault(const NYql::TCreateObjectSettings& container, const TString& key) {
const auto value = container.GetFeaturesExtractor().Extract(key);
return value ? *value : TString{};
}

TYqlConclusionStatus CheckFeatureFlag(TInternalModificationContext& context) {
TYqlConclusionStatus CheckFeatureFlag(const TInternalModificationContext& context) {
auto* const actorSystem = context.GetExternalData().GetActorSystem();
if (!actorSystem) {
ythrow yexception() << "This place needs an actor system. Please contact internal support";
Expand Down Expand Up @@ -48,15 +50,16 @@ std::pair<TString, TString> SplitPathByObjectId(const TString& objectId) {

void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
const NYql::TCreateObjectSettings& settings,
const TString& database) {
const TExternalModificationContext& context) {

const auto pathPair = SplitPathByDb(settings.GetObjectId(), database);
const auto pathPair = SplitPathByDb(settings.GetObjectId(), context.GetDatabase());
modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateView);

auto& viewDesc = *modifyScheme.MutableCreateView();
viewDesc.SetName(pathPair.second);
viewDesc.SetQueryText(GetByKeyOrDefault(settings, "query_text"));
NSQLTranslation::Serialize(context.GetTranslationSettings(), *viewDesc.MutableCapturedContext());

if (!settings.GetFeaturesExtractor().IsFinished()) {
ythrow TBadArgumentException() << "Unknown property: " << settings.GetFeaturesExtractor().GetRemainedParamsString();
Expand Down Expand Up @@ -92,20 +95,20 @@ NThreading::TFuture<TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvP
}

NThreading::TFuture<TYqlConclusionStatus> CreateView(const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) {
const TInternalModificationContext& context) {
auto proposal = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(context.GetExternalData().GetDatabase());
if (context.GetExternalData().GetUserToken()) {
proposal->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken());
}
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
FillCreateViewProposal(schemeTx, settings, context.GetExternalData().GetDatabase());
FillCreateViewProposal(schemeTx, settings, context.GetExternalData());

return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), true);
}

NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) {
const TInternalModificationContext& context) {
auto proposal = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(context.GetExternalData().GetDatabase());
if (context.GetExternalData().GetUserToken()) {
Expand All @@ -119,8 +122,8 @@ NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettin

void PrepareCreateView(NKqpProto::TKqpSchemeOperation& schemeOperation,
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& context) {
FillCreateViewProposal(*schemeOperation.MutableCreateView(), settings, context.GetExternalData().GetDatabase());
const TInternalModificationContext& context) {
FillCreateViewProposal(*schemeOperation.MutableCreateView(), settings, context.GetExternalData());
}

void PrepareDropView(NKqpProto::TKqpSchemeOperation& schemeOperation,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/behaviour/view/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SRCS(
PEERDIR(
ydb/core/base
ydb/core/kqp/gateway/actors
ydb/core/kqp/provider
ydb/core/tx/tx_proxy
ydb/services/metadata/abstract
ydb/services/metadata/manager
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ TTableMetadataResult GetViewMetadataResult(
metadata->SchemaVersion = description.GetVersion();
metadata->Kind = NYql::EKikimrTableKind::View;
metadata->Attributes = schemeEntry.Attributes;
metadata->ViewPersistedData = {description.GetQueryText()};
metadata->ViewPersistedData = {description.GetQueryText(), description.GetCapturedContext()};

return builtResult;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
if (SessionCtx->GetUserToken()) {
context.SetUserToken(*SessionCtx->GetUserToken());
}
context.SetTranslationSettings(SessionCtx->Query().TranslationSettings);

auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,15 @@ class TKqpHost : public IKqpHost {
{
std::shared_ptr<NYql::TAstParseResult> queryAst;
if (!query.AstResult) {
NSQLTranslation::TTranslationSettings effectiveSettings;
auto astRes = ParseQuery(SessionCtx->Query().Type, usePgParser,
query.Text, query.ParameterTypes, isSql, sqlAutoCommit, sqlVersion, TypesCtx->DeprecatedSQL,
Cluster, SessionCtx->Config()._KqpTablePathPrefix.Get().GetRef(),
SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), SessionCtx->Config().BindingsMode,
SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources(), ctx, SessionCtx->Config().EnablePgConstsToParams,
SessionCtx->Config().FeatureFlags.GetEnablePgSyntax());
SessionCtx->Config().FeatureFlags.GetEnablePgSyntax(),
&effectiveSettings);
SessionCtx->Query().TranslationSettings = std::move(effectiveSettings);
queryAst = std::make_shared<NYql::TAstParseResult>(std::move(astRes));
} else {
queryAst = query.AstResult->Ast;
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/kqp/host/kqp_translate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ NYql::TAstParseResult ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<
std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, bool sqlAutoCommit,
TMaybe<ui16>& sqlVersion, bool& deprecatedSQL, TString cluster, TString kqpTablePathPrefix,
ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources,
NYql::TExprContext& ctx, bool isEnablePgConstsToParams, bool isEnablePgSyntax) {
NYql::TExprContext& ctx, bool isEnablePgConstsToParams, bool isEnablePgSyntax,
NSQLTranslation::TTranslationSettings* effectiveSettings) {
NYql::TAstParseResult astRes;
if (isSql) {
auto settings = GetTranslationSettings(queryType, usePgParser, sqlAutoCommit, queryText, queryParameters, sqlVersion, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams, isEnablePgSyntax);
ui16 actualSyntaxVersion = 0;
auto ast = NSQLTranslation::SqlToYql(queryText, settings, nullptr, &actualSyntaxVersion);
auto ast = NSQLTranslation::SqlToYql(queryText, settings, nullptr, &actualSyntaxVersion, effectiveSettings);
deprecatedSQL = (actualSyntaxVersion == 0);
sqlVersion = actualSyntaxVersion;
return std::move(ast);
Expand All @@ -169,7 +170,8 @@ NYql::TAstParseResult ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<
}

TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<Ydb::Query::Syntax>& syntax, const TString& queryText, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql,
TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isEnablePgSyntax) {
TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isEnablePgSyntax,
NSQLTranslation::TTranslationSettings* effectiveSettings) {
bool deprecatedSQL;
TMaybe<ui16> sqlVersion;
TMaybe<bool> usePgParser;
Expand All @@ -192,7 +194,7 @@ TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<Ydb::Query::
} else {
sqlAutoCommit = false;
}
auto astRes = ParseQuery(queryType, usePgParser, queryText, queryParameters, isSql, sqlAutoCommit, sqlVersion, deprecatedSQL, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams, isEnablePgSyntax);
auto astRes = ParseQuery(queryType, usePgParser, queryText, queryParameters, isSql, sqlAutoCommit, sqlVersion, deprecatedSQL, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams, isEnablePgSyntax, effectiveSettings);
return TQueryAst(std::make_shared<NYql::TAstParseResult>(std::move(astRes)), sqlVersion, deprecatedSQL);
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/host/kqp_translate.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
NYql::TAstParseResult ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<bool>& usePgParser, const TString& queryText,
std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, bool sqlAutoCommit, TMaybe<ui16>& sqlVersion,
bool& deprecatedSQL, TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode,
bool isEnableExternalDataSources, NYql::TExprContext& ctx, bool isEnablePgConstsToParams, bool isEnablePgSyntax);
bool isEnableExternalDataSources, NYql::TExprContext& ctx, bool isEnablePgConstsToParams, bool isEnablePgSyntax,
NSQLTranslation::TTranslationSettings* effectiveSettings = nullptr);

TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<Ydb::Query::Syntax>& syntax, const TString& queryText,
std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, TString cluster, TString kqpTablePathPrefix,
ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isEnablePgSyntax);
ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode,
bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isEnablePgSyntax,
NSQLTranslation::TTranslationSettings* effectiveSettings = nullptr);

} // namespace NKqp
} // namespace NKikimr
33 changes: 14 additions & 19 deletions ydb/core/kqp/provider/rewrite_io_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "rewrite_io_utils.h"

#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
Expand All @@ -15,24 +16,18 @@ using namespace NNodes;

constexpr const char* QueryGraphNodeSignature = "SavedQueryGraph";

NSQLTranslation::TTranslationSettings CreateViewTranslationSettings(const TString& cluster) {
NSQLTranslation::TTranslationSettings settings;

settings.DefaultCluster = cluster;
settings.ClusterMapping[cluster] = TString(NYql::KikimrProviderName);
settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;

return settings;
}

TExprNode::TPtr CompileViewQuery(
const TString& query,
TExprContext& ctx,
const TString& cluster,
IModuleResolver::TPtr moduleResolver
const NSQLTranslation::TTranslationSettings& outerSettings,
IModuleResolver::TPtr moduleResolver,
const TViewPersistedData& viewData
) {
auto translationSettings = outerSettings;
translationSettings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;
NSQLTranslation::Deserialize(viewData.CapturedContext, translationSettings);

TAstParseResult queryAst;
queryAst = NSQLTranslation::SqlToYql(query, CreateViewTranslationSettings(cluster));
queryAst = NSQLTranslation::SqlToYql(viewData.QueryText, translationSettings);

ctx.IssueManager.AddIssues(queryAst.Issues);
if (!queryAst.IsOk()) {
Expand Down Expand Up @@ -123,16 +118,16 @@ TExprNode::TPtr FindTopLevelRead(const TExprNode::TPtr& queryGraph) {
TExprNode::TPtr RewriteReadFromView(
const TExprNode::TPtr& node,
TExprContext& ctx,
const TString& query,
const TString& cluster,
IModuleResolver::TPtr moduleResolver
const NSQLTranslation::TTranslationSettings& outerSettings,
IModuleResolver::TPtr moduleResolver,
const TViewPersistedData& viewData
) {
const TCoRead readNode(node->ChildPtr(0));
const auto worldBeforeThisRead = readNode.World().Ptr();

TExprNode::TPtr queryGraph = FindSavedQueryGraph(readNode.Ptr());
if (!queryGraph) {
queryGraph = CompileViewQuery(query, ctx, cluster, moduleResolver);
queryGraph = CompileViewQuery(ctx, outerSettings, moduleResolver, viewData);
if (!queryGraph) {
ctx.AddError(TIssue(ctx.GetPosition(readNode.Pos()),
"The query stored in the view cannot be compiled."));
Expand All @@ -156,4 +151,4 @@ TExprNode::TPtr RewriteReadFromView(
return Build<TCoLeft>(ctx, node->Pos()).Input(topLevelRead).Done().Ptr();
}

}
}
9 changes: 5 additions & 4 deletions ydb/core/kqp/provider/rewrite_io_utils.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#pragma once

#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
#include <ydb/library/yql/ast/yql_expr.h>

namespace NYql {

TExprNode::TPtr RewriteReadFromView(
const TExprNode::TPtr& node,
TExprContext& ctx,
const TString& query,
const TString& cluster,
IModuleResolver::TPtr moduleResolver
const NSQLTranslation::TTranslationSettings& outerSettings,
IModuleResolver::TPtr moduleResolver,
const TViewPersistedData& viewData
);

}
}
6 changes: 3 additions & 3 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
}

TStatus HandleDropObject(TKiDropObject node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "DropObject is not yet implemented for intent determination transformer"));
return TStatus::Error;
Y_UNUSED(node);
Y_UNUSED(ctx);
return TStatus::Ok;
}

TStatus HandleCreateGroup(TKiCreateGroup node, TExprContext& ctx) override {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ class TKikimrDataSource : public TDataProviderBase {
.Repeat(TExprStep::LoadTablesMetadata)
.Repeat(TExprStep::RewriteIO);

const auto& query = tableDesc.Metadata->ViewPersistedData.QueryText;
return RewriteReadFromView(node, ctx, query, cluster, Types.Modules);
const auto& viewData = tableDesc.Metadata->ViewPersistedData;
return RewriteReadFromView(node, ctx, SessionCtx->Query().TranslationSettings, Types.Modules, viewData);
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/protos/yql_translation_settings.pb.h>
#include <ydb/core/scheme/scheme_types_proto.h>

#include <library/cpp/json/json_reader.h>
Expand Down Expand Up @@ -396,6 +397,7 @@ enum EMetaSerializationType : ui64 {

struct TViewPersistedData {
TString QueryText;
NYql::NProto::TTranslationSettings CapturedContext;
};

struct TKikimrTableMetadata : public TThrRefBase {
Expand Down
33 changes: 33 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,3 +937,36 @@ TCoNameValueTupleList TKiExecDataQuerySettings::BuildNode(TExprContext& ctx, TPo
}

} // namespace NYql

namespace NSQLTranslation {

void Serialize(const TTranslationSettings& settings, NYql::NProto::TTranslationSettings& serializedSettings) {
serializedSettings.SetPathPrefix(settings.PathPrefix);
serializedSettings.SetSyntaxVersion(settings.SyntaxVersion);
serializedSettings.SetAnsiLexer(settings.AnsiLexer);
serializedSettings.SetPgParser(settings.PgParser);

auto* pragmas = serializedSettings.MutablePragmas();
pragmas->Clear();
pragmas->Add(settings.Flags.begin(), settings.Flags.end());
}

void Deserialize(const NYql::NProto::TTranslationSettings& serializedSettings, TTranslationSettings& settings) {
#define DeserializeSetting(settingName) \
if (serializedSettings.Has##settingName()) { \
settings.settingName = serializedSettings.Get##settingName(); \
}

DeserializeSetting(PathPrefix);
DeserializeSetting(SyntaxVersion);
DeserializeSetting(AnsiLexer);
DeserializeSetting(PgParser);

#undef DeserializeSetting

// overwrite existing pragmas
settings.Flags.clear();
settings.Flags.insert(serializedSettings.GetPragmas().begin(), serializedSettings.GetPragmas().end());
}

}
10 changes: 10 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ struct TKikimrQueryContext : TThrRefBase {
// we do not want add extra life time for query context here
std::shared_ptr<NKikimr::NGRpcService::IRequestCtxMtSafe> RpcCtx;

NSQLTranslation::TTranslationSettings TranslationSettings;

void Reset() {
PrepareOnly = false;
SuppressDdlChecks = false;
Expand All @@ -141,6 +143,7 @@ struct TKikimrQueryContext : TThrRefBase {

RlPath.Clear();
RpcCtx.reset();
TranslationSettings = NSQLTranslation::TTranslationSettings();
}
};

Expand Down Expand Up @@ -561,3 +564,10 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSink(
TIntrusivePtr<IKikimrQueryExecutor> queryExecutor);

} // namespace NYql

namespace NSQLTranslation {

void Serialize(const TTranslationSettings& settings, NYql::NProto::TTranslationSettings& serializedSettings);
void Deserialize(const NYql::NProto::TTranslationSettings& serializedSettings, TTranslationSettings& settings);

}
Loading

0 comments on commit 142e642

Please sign in to comment.