diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index bac8acb5f469..68cadacba81a 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -14,15 +14,16 @@ namespace NKikimr::NKqp::NPrivateEvents { struct TEvCompileRequest: public TEventLocal { TEvCompileRequest(const TIntrusiveConstPtr& userToken, const TMaybe& uid, - TMaybe&& query, bool keepInCache, bool isQueryActionPrepare, TInstant deadline, + TMaybe&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline, TKqpDbCountersPtr dbCounters, std::shared_ptr> intrestedInResult, const TIntrusivePtr& userRequestContext, NLWTrace::TOrbit orbit = {}, - TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false) + TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe queryAst = Nothing()) : UserToken(userToken) , Uid(uid) , Query(std::move(query)) , KeepInCache(keepInCache) , IsQueryActionPrepare(isQueryActionPrepare) + , PerStatementResult(perStatementResult) , Deadline(deadline) , DbCounters(dbCounters) , UserRequestContext(userRequestContext) @@ -30,6 +31,7 @@ struct TEvCompileRequest: public TEventLocal Query; bool KeepInCache = false; bool IsQueryActionPrepare = false; + bool PerStatementResult = false; // it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration) TInstant Deadline; TKqpDbCountersPtr DbCounters; @@ -51,6 +54,8 @@ struct TEvCompileRequest: public TEventLocal> IntrestedInResult; bool CollectDiagnostics = false; + + TMaybe QueryAst; }; struct TEvRecompileRequest: public TEventLocal { @@ -103,12 +108,14 @@ struct TEvCompileResponse: public TEventLocal { - TEvParseResponse(const TKqpQueryId& query, TMaybe astResult) - : AstResult(std::move(astResult)) - , Query(query) {} + TEvParseResponse(const TKqpQueryId& query, TVector astStatements, NLWTrace::TOrbit orbit = {}) + : AstStatements(std::move(astStatements)) + , Query(query) + , Orbit(std::move(orbit)) {} - TMaybe AstResult; + TVector AstStatements; TKqpQueryId Query; + NLWTrace::TOrbit Orbit; }; struct TEvCompileInvalidateRequest: public TEventLocal { TKqpDbCountersPtr dbCounters, std::optional federatedQuerySetup, const TIntrusivePtr& userRequestContext, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics, - ECompileActorAction compileAction, TMaybe astResult) + bool perStatementResult, ECompileActorAction compileAction, TMaybe queryAst) : Owner(owner) , ModuleResolverState(moduleResolverState) , Counters(counters) , FederatedQuerySetup(federatedQuerySetup) , Uid(uid) , QueryId(queryId) - , QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult) + , QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst) , UserToken(userToken) , DbCounters(dbCounters) , Config(MakeIntrusive()) @@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped { , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") , TempTablesState(std::move(tempTablesState)) , CollectFullDiagnostics(collectFullDiagnostics) + , PerStatementResult(perStatementResult) , CompileAction(compileAction) - , AstResult(std::move(astResult)) + , QueryAst(std::move(queryAst)) { Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false); @@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped { } private: - void SetQueryAst(const TActorContext &ctx) { - TString cluster = QueryId.Cluster; + + TVector GetAstStatements(const TActorContext &ctx) { + TString cluster = QueryId.Cluster; TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef(); ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef(); NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode; bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources(); bool isEnablePgConstsToParams = Config->EnablePgConstsToParams; + bool perStatementExecution = Config->EnablePerStatementQueryExecution && PerStatementResult; - auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams); - YQL_ENSURE(astResult.Ast); - if (astResult.Ast->IsOk()) { - AstResult = std::move(astResult); - } + return ParseStatements(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql(), perStatementExecution); } void StartParsing(const TActorContext &ctx) { - SetQueryAst(ctx); - Become(&TKqpCompileActor::CompileState); - ReplyParseResult(ctx); + ReplyParseResult(ctx, GetAstStatements(ctx)); } void StartCompilation(const TActorContext &ctx) { @@ -352,15 +349,38 @@ class TKqpCompileActor : public TActorBootstrapped { << ", at state:" << state); } - void ReplyParseResult(const TActorContext &ctx) { + void ReplyParseResult(const TActorContext &ctx, TVector&& astStatements) { Y_UNUSED(ctx); + + if (astStatements.empty()) { + NYql::TIssue issue(NYql::TPosition(), "Parsing result of query is empty"); + ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue}); + return; + } + + for (size_t statementId = 0; statementId < astStatements.size(); ++statementId) { + if (!astStatements[statementId].Ast || !astStatements[statementId].Ast->IsOk() || !astStatements[statementId].Ast->Root) { + ALOG_ERROR(NKikimrServices::KQP_COMPILE_ACTOR, "Get parsing result with error" + << ", self: " << SelfId() + << ", owner: " << Owner + << ", statement id: " << statementId); + + NYql::TIssue issue(NYql::TPosition(), "Error while parsing query."); + for (const auto& i : astStatements[statementId].Ast->Issues) { + issue.AddSubIssue(MakeIntrusive(i)); + } + + ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue}); + return; + } + } + ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result" << ", self: " << SelfId() << ", owner: " << Owner - << (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful")); + << ", statements size: " << astStatements.size()); - auto responseEv = MakeHolder(QueryId, std::move(AstResult)); - AstResult = Nothing(); + auto responseEv = MakeHolder(QueryId, std::move(astStatements)); Send(Owner, responseEv.Release()); Counters->ReportCompileFinish(DbCounters); @@ -379,8 +399,8 @@ class TKqpCompileActor : public TActorBootstrapped { KqpCompileResult->PreparedQuery = preparedQueryHolder; KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()); - if (AstResult) { - KqpCompileResult->Ast = AstResult->Ast; + if (QueryAst) { + KqpCompileResult->Ast = QueryAst->Ast; } } @@ -482,8 +502,9 @@ class TKqpCompileActor : public TActorBootstrapped { TKqpTempTablesState::TConstPtr TempTablesState; bool CollectFullDiagnostics; + const bool PerStatementResult; ECompileActorAction CompileAction; - TMaybe AstResult; + TMaybe QueryAst; }; void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) { @@ -512,6 +533,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode(); kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams() && serviceConfig.GetEnableAstCache(); kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit(); + kqpConfig.EnablePerStatementQueryExecution = serviceConfig.GetEnablePerStatementQueryExecution(); if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) { kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U)); @@ -527,14 +549,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP std::optional federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TIntrusivePtr& userRequestContext, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, - ECompileActorAction compileAction, TMaybe astResult, bool collectFullDiagnostics) + ECompileActorAction compileAction, TMaybe queryAst, bool collectFullDiagnostics, + bool perStatementResult) { return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig, moduleResolverState, counters, uid, query, userToken, dbCounters, federatedQuerySetup, userRequestContext, std::move(traceId), std::move(tempTablesState), collectFullDiagnostics, - compileAction, std::move(astResult)); + perStatementResult, compileAction, std::move(queryAst)); } } // namespace NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index dcba901ec2f8..b498192040bc 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -51,8 +51,10 @@ class TKqpQueryCache { AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->Ast), compileResult->Uid); } - bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache) { - InsertQuery(compileResult); + bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) { + if (!isPerStatementExecution) { + InsertQuery(compileResult); + } if (isEnableAstCache && compileResult->Ast) { InsertAst(compileResult); } @@ -81,7 +83,6 @@ class TKqpQueryCache { } Y_ABORT_UNLESS(List.GetSize() == Index.size()); - Y_ABORT_UNLESS(List.GetSize() == QueryIndex.size()); return removedItem != nullptr; } @@ -168,7 +169,6 @@ class TKqpQueryCache { Index.erase(it); Y_ABORT_UNLESS(List.GetSize() == Index.size()); - Y_ABORT_UNLESS(List.GetSize() == QueryIndex.size()); return true; } @@ -189,7 +189,6 @@ class TKqpQueryCache { } Y_ABORT_UNLESS(List.GetSize() == Index.size()); - Y_ABORT_UNLESS(List.GetSize() == QueryIndex.size()); return prevSize - Size(); } @@ -233,15 +232,18 @@ class TKqpQueryCache { }; struct TKqpCompileSettings { - TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE) + TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, + const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE) : KeepInCache(keepInCache) , IsQueryActionPrepare(isQueryActionPrepare) + , PerStatementResult(perStatementResult) , Deadline(deadline) , Action(action) {} bool KeepInCache; bool IsQueryActionPrepare; + bool PerStatementResult; TInstant Deadline; ECompileActorAction Action; }; @@ -253,7 +255,7 @@ struct TKqpCompileRequest { const TIntrusivePtr& userRequestContext, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, TKqpTempTablesState::TConstPtr tempTablesState = {}, - TMaybe astResult = {}) + TMaybe queryAst = {}) : Sender(sender) , Query(std::move(query)) , Uid(uid) @@ -266,7 +268,7 @@ struct TKqpCompileRequest { , Cookie(cookie) , TempTablesState(std::move(tempTablesState)) , IntrestedInResult(std::move(intrestedInResult)) - , AstResult(std::move(astResult)) + , QueryAst(std::move(queryAst)) {} TActorId Sender; @@ -283,7 +285,7 @@ struct TKqpCompileRequest { ui64 Cookie; TKqpTempTablesState::TConstPtr TempTablesState; std::shared_ptr> IntrestedInResult; - TMaybe AstResult; + TMaybe QueryAst; bool IsIntrestedInResult() const { return IntrestedInResult->load(); @@ -645,12 +647,17 @@ class TKqpCompileService : public TActorBootstrapped { ev->Get()->Orbit, ev->Get()->Query ? ev->Get()->Query->UserSid : 0); - TKqpCompileSettings compileSettings(request.KeepInCache, request.IsQueryActionPrepare, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); + TKqpCompileSettings compileSettings(request.KeepInCache, request.IsQueryActionPrepare, request.PerStatementResult, + request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), compileSettings, request.UserToken, dbCounters, ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, std::move(ev->Get()->Orbit), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState)); + if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) { + return CompileByAst(*request.QueryAst, compileRequest, ctx); + } + if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -702,7 +709,7 @@ class TKqpCompileService : public TActorBootstrapped { NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); - TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); + TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, compileSettings, request.UserToken, dbCounters, ev->Cookie, std::move(ev->Get()->IntrestedInResult), @@ -760,13 +767,14 @@ class TKqpCompileService : public TActorBootstrapped { << ", compileActor: " << ev->Sender); bool keepInCache = compileRequest.CompileSettings.KeepInCache && compileResult->AllowCache; + bool isPerStatementExecution = TableServiceConfig.GetEnableAstCache() && compileRequest.QueryAst; bool hasTempTablesNameClashes = HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState, true); try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { if (!hasTempTablesNameClashes) { - UpdateQueryCache(compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare); + UpdateQueryCache(compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution); } if (ev->Get()->ReplayMessage) { @@ -847,43 +855,44 @@ class TKqpCompileService : public TActorBootstrapped { return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); } - void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare) { + void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) { if (QueryCache.FindByUid(compileResult->Uid, false)) { QueryCache.Replace(compileResult); } else if (keepInCache) { - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { + if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) { Counters->CompileQueryCacheEvicted->Inc(); } if (compileResult->Query && isQueryActionPrepare) { - if (InsertPreparingQuery(compileResult, true)) { + if (InsertPreparingQuery(compileResult, true, isPerStatementExecution)) { Counters->CompileQueryCacheEvicted->Inc(); }; } } } - void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) { - auto& parseResult = ev->Get()->AstResult; - auto& query = ev->Get()->Query; - auto compileRequest = RequestsQueue.FinishActiveRequest(query); - if (parseResult && parseResult->Ast->IsOk()) { - auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.CompileSettings.KeepInCache); - if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { - compileResult = nullptr; - } - if (compileResult) { - Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); + void CompileByAst(const TQueryAst& queryAst, TKqpCompileRequest& compileRequest, const TActorContext& ctx) { + YQL_ENSURE(queryAst.Ast); + YQL_ENSURE(queryAst.Ast->IsOk()); + YQL_ENSURE(queryAst.Ast->Root); + auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache); + + if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { + compileResult = nullptr; + } - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" - << ", sender: " << compileRequest.Sender - << ", queryUid: " << compileResult->Uid); + if (compileResult) { + Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); - compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" + << ", sender: " << compileRequest.Sender + << ", queryUid: " << compileResult->Uid); - ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); - return; - } + compileResult->Ast->PgAutoParamValues = std::move(queryAst.Ast->PgAutoParamValues); + + ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; } + Counters->ReportQueryCacheHit(compileRequest.DbCounters, false); LWTRACK(KqpCompileServiceEnqueued, @@ -891,30 +900,43 @@ class TKqpCompileService : public TActorBootstrapped { compileRequest.Query.UserSid); compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE; - compileRequest.AstResult = std::move(parseResult); + compileRequest.QueryAst = std::move(queryAst); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(compileRequest.DbCounters); LOG_WARN_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Requests queue size limit exceeded" - << ", sender: " << ev->Sender + << ", sender: " << compileRequest.Sender << ", queueSize: " << RequestsQueue.Size()); NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Exceeded maximum number of requests in compile service queue."); - ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + ReplyError(compileRequest.Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); return; } LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Added request to queue" - << ", sender: " << ev->Sender + << ", sender: " << compileRequest.Sender << ", queueSize: " << RequestsQueue.Size()); ProcessQueue(ctx); } + void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) { + auto& astStatements = ev->Get()->AstStatements; + YQL_ENSURE(astStatements.size()); + auto& query = ev->Get()->Query; + auto compileRequest = RequestsQueue.FinishActiveRequest(query); + if (astStatements.size() > 1) { + ReplyQueryStatements(compileRequest.Sender, astStatements, query, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; + } + + CompileByAst(astStatements.front(), compileRequest, ctx); + } + private: - bool InsertPreparingQuery(const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache) { + bool InsertPreparingQuery(const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache, bool isPerStatementExecution) { YQL_ENSURE(compileResult->Query); auto query = *compileResult->Query; @@ -939,7 +961,7 @@ class TKqpCompileService : public TActorBootstrapped { auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->Ast); newCompileResult->AllowCache = compileResult->AllowCache; newCompileResult->PreparedQuery = compileResult->PreparedQuery; - return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache()); + return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution); } void ProcessQueue(const TActorContext& ctx) { @@ -972,7 +994,7 @@ class TKqpCompileService : public TActorBootstrapped { void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) { auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, MetadataProviderConfig, ModuleResolverState, Counters, request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext, - request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.AstResult), CollectDiagnostics); + request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics, request.CompileSettings.PerStatementResult); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); @@ -1049,6 +1071,32 @@ class TKqpCompileService : public TActorBootstrapped { << ", message: " << e.what()); } + void Reply(const TActorId& sender, const TVector astStatements, const TKqpQueryId query, + const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) + { + LWTRACK(KqpCompileServiceReply, + orbit, + query.UserSid, + {}); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send ast statements response"); + + auto responseEv = MakeHolder(std::move(query), astStatements, std::move(orbit)); + + if (span) { + span.End(); + } + + ctx.Send(sender, responseEv.Release(), 0, cookie); + } + + void ReplyQueryStatements(const TActorId& sender, const TVector astStatements, + const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) + { + LWTRACK(KqpCompileServiceReplyStatements, orbit); + Reply(sender, astStatements, query, ctx, cookie, std::move(orbit), std::move(span)); + } + private: TTableServiceConfig TableServiceConfig; TQueryServiceConfig QueryServiceConfig; diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 213dc1ecffc2..b2d4c43c7a71 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -36,8 +36,9 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP NWilson::TTraceId traceId = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, ECompileActorAction compileAction = ECompileActorAction::COMPILE, - TMaybe astResult = {}, - bool collectFullDiagnostics = false); + TMaybe queryAst = {}, + bool collectFullDiagnostics = false, + bool PerStatementResult = false); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr& userToken, const TMaybe& uid, TMaybe&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index a5122d01188a..6ff10294b4c2 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -128,9 +128,9 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, - const bool enableOlapSink) + const bool enableOlapSink, ui32 statementResultIndex) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, - maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult + maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult ) , AsyncIoFactory(std::move(asyncIoFactory)) , EnableOlapSink(enableOlapSink) @@ -2437,10 +2437,10 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink, ui32 statementResultIndex) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); + std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 242c64027224..bab811687132 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -92,7 +92,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink); + const bool enableOlapSink, ui32 statementResultIndex); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index b3aa7fd98b6d..2262fe3d6295 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -83,12 +83,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink) + const bool enableOlapSink, ui32 statementResultIndex) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex); } TMaybe txsType; @@ -104,13 +104,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index fc944349ea13..3f83142232f6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -122,7 +122,7 @@ class TKqpExecuterBase : public TActorBootstrapped { const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, - ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false) + ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false) : Request(std::move(request)) , Database(database) , UserToken(userToken) @@ -134,6 +134,7 @@ class TKqpExecuterBase : public TActorBootstrapped { , AggregationSettings(aggregation) , HasOlapTable(false) , StreamResult(streamResult) + , StatementResultIndex(statementResultIndex) { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive(); @@ -294,7 +295,7 @@ class TKqpExecuterBase : public TActorBootstrapped { if (!trailingResults) { auto streamEv = MakeHolder(); streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); - streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex); + streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex); streamEv->Record.SetChannelId(channel.Id); streamEv->Record.MutableResultSet()->Swap(&resultSet); @@ -1771,7 +1772,7 @@ class TKqpExecuterBase : public TActorBootstrapped { IActor* proxy; if (txResult.IsStream && txResult.QueryResultIndex.Defined()) { proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType, - txResult.ColumnOrder, *txResult.QueryResultIndex, Target, this->SelfId()); + txResult.ColumnOrder, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex); } else { proxy = CreateResultDataChannelProxy(TxId, channel.Id, this->SelfId(), channel.DstInputIndex, ResponseEv.get()); @@ -1907,6 +1908,8 @@ class TKqpExecuterBase : public TActorBootstrapped { THashMap ResultChannelToComputeActor; THashMap> SourceScanStageIdToParititions; + ui32 StatementResultIndex; + private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; @@ -1920,14 +1923,14 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink); + const bool enableOlapSink, ui32 statementResultIndex); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index 3d3d9d00d199..f2538e598a10 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -136,12 +136,13 @@ class TResultStreamChannelProxy : public TResultCommonChannelProxy { public: TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, const TVector* columnOrder, ui32 queryResultIndex, TActorId target, - TActorId executer) + TActorId executer, size_t statementResultIndex) : TResultCommonChannelProxy(txId, channelId, executer) , ColumnOrder(columnOrder) , ItemType(itemType) , QueryResultIndex(queryResultIndex) - , Target(target) {} + , Target(target) + , StatementResultIndex(statementResultIndex) {} private: void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) override { @@ -158,7 +159,7 @@ class TResultStreamChannelProxy : public TResultCommonChannelProxy { auto streamEv = MakeHolder(); streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); - streamEv->Record.SetQueryResultIndex(QueryResultIndex); + streamEv->Record.SetQueryResultIndex(QueryResultIndex + StatementResultIndex); streamEv->Record.MutableResultSet()->Swap(&resultSet); LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, @@ -173,6 +174,7 @@ class TResultStreamChannelProxy : public TResultCommonChannelProxy { NKikimr::NMiniKQL::TType* ItemType; ui32 QueryResultIndex = 0; const NActors::TActorId Target; + size_t StatementResultIndex; }; class TResultDataChannelProxy : public TResultCommonChannelProxy { @@ -211,7 +213,7 @@ class TResultDataChannelProxy : public TResultCommonChannelProxy { NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, const TVector* columnOrder, ui32 queryResultIndex, TActorId target, - TActorId executer) + TActorId executer, ui32 statementResultIndex) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "CreateResultStreamChannelProxy: TxId: " << txId << @@ -219,7 +221,7 @@ NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKiki ); return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target, - executer); + executer, statementResultIndex); } NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TActorId executer, diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h index 1eddf0d033e1..e1820ea015bf 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.h +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h @@ -27,7 +27,7 @@ struct TKqpExecuterTxResult; NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, const TVector* columnOrder, ui32 queryResultIndex, NActors::TActorId target, - NActors::TActorId executer); + NActors::TActorId executer, ui32 statementResultIndex); NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 62d799717d20..6561de470465 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -49,9 +49,10 @@ class TKqpScanExecuter : public TKqpExecuterBase& userRequestContext) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, + ui32 statementResultIndex) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, - maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter", + maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::ScanExecuter, "ScanExecuter", false ) , PreparedQuery(preparedQuery) @@ -367,10 +368,10 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex) { return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, - preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); + preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_translate.cpp b/ydb/core/kqp/host/kqp_translate.cpp index a0c97da736b0..48eaa8f5b7d8 100644 --- a/ydb/core/kqp/host/kqp_translate.cpp +++ b/ydb/core/kqp/host/kqp_translate.cpp @@ -199,5 +199,58 @@ TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe(std::move(astRes)), sqlVersion, deprecatedSQL); } +TVector ParseStatements(NYql::EKikimrQueryType queryType, const TMaybe& usePgParser, const TString& queryText, + std::shared_ptr> queryParameters, bool sqlAutoCommit, + TMaybe& sqlVersion, TString cluster, TString kqpTablePathPrefix, + ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, + NYql::TExprContext& ctx, bool isEnablePgConstsToParams, bool isSql) { + + TVector result; + NYql::TAstParseResult astRes; + if (isSql) { + auto settings = GetTranslationSettings(queryType, usePgParser, sqlAutoCommit, queryText, queryParameters, sqlVersion, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams); + ui16 actualSyntaxVersion = 0; + auto astStatements = NSQLTranslation::SqlToAstStatements(queryText, settings, nullptr, &actualSyntaxVersion); + sqlVersion = actualSyntaxVersion; + for (auto&& ast : astStatements) { + result.push_back({std::make_shared(std::move(ast)), sqlVersion, (actualSyntaxVersion == 0)}); + } + return result; + } else { + sqlVersion = {}; + return {{std::make_shared(NYql::ParseAst(queryText)), sqlVersion, true}}; + } +} + +TVector ParseStatements(NYql::EKikimrQueryType queryType, const TMaybe& syntax, const TString& queryText, std::shared_ptr> queryParameters, + TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isSql, + bool perStatementExecution) { + if (!perStatementExecution) { + return {ParseQuery(queryType, syntax, queryText, queryParameters, isSql, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams)}; + } + TMaybe sqlVersion; + TMaybe usePgParser; + if (syntax) + switch (*syntax) { + case Ydb::Query::Syntax::SYNTAX_YQL_V1: + usePgParser = false; + break; + case Ydb::Query::Syntax::SYNTAX_PG: + usePgParser = true; + break; + default: + break; + } + + NYql::TExprContext ctx; + bool sqlAutoCommit; + if (queryType == NYql::EKikimrQueryType::YqlScript || queryType == NYql::EKikimrQueryType::YqlScriptStreaming) { + sqlAutoCommit = true; + } else { + sqlAutoCommit = false; + } + return ParseStatements(queryType, usePgParser, queryText, queryParameters, sqlAutoCommit, sqlVersion, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams, isSql); +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_translate.h b/ydb/core/kqp/host/kqp_translate.h index bf218b8464a0..15e7868470c2 100644 --- a/ydb/core/kqp/host/kqp_translate.h +++ b/ydb/core/kqp/host/kqp_translate.h @@ -24,5 +24,8 @@ TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe> queryParameters, bool isSql, TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams); +TVector ParseStatements(NYql::EKikimrQueryType queryType, const TMaybe& syntax, const TString& queryText, std::shared_ptr> queryParameters, + TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams, bool isSql, bool perStatementExecution); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index f6fb2beb1e2e..c7ae3179a204 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -162,6 +162,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableAstCache = false; bool EnablePgConstsToParams = false; ui64 ExtractPredicateRangesLimit = 0; + bool EnablePerStatementQueryExecution = false; }; } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 488187358f50..4d83cc4a57d4 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -134,6 +134,13 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { return true; } +bool TKqpQueryState::SaveAndCheckParseResult(TEvKqp::TEvParseResponse&& ev) { + Statements = std::move(ev.AstStatements); + CurrentStatementId = 0; + Orbit = std::move(ev.Orbit); + return true; +} + std::unique_ptr TKqpQueryState::BuildCompileRequest(std::shared_ptr> cookie) { TMaybe query; TMaybe uid; @@ -144,6 +151,7 @@ std::unique_ptr TKqpQueryState::BuildCompileRequest(s settings.Syntax = GetSyntax(); bool keepInCache = false; + const bool perStatementResult = !HasTxControl() && GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE; switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes()); @@ -173,10 +181,18 @@ std::unique_ptr TKqpQueryState::BuildCompileRequest(s if (QueryDeadlines.CancelAt) { compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); } + bool isQueryActionPrepare = GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE; - return std::make_unique(UserToken, uid, - std::move(query), keepInCache, isQueryActionPrepare, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState, GetCollectDiagnostics()); + TMaybe statementAst; + if (!Statements.empty()) { + YQL_ENSURE(CurrentStatementId < Statements.size()); + statementAst = Statements[CurrentStatementId]; + } + + return std::make_unique(UserToken, uid, std::move(query), keepInCache, + isQueryActionPrepare, perStatementResult, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, + std::move(Orbit), TempTablesState, GetCollectDiagnostics(), statementAst); } std::unique_ptr TKqpQueryState::BuildReCompileRequest(std::shared_ptr> cookie) { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 48f32a7342a2..7efe26dbdd14 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -125,6 +125,11 @@ class TKqpQueryState : public TNonCopyable { NYql::TIssues Issues; + TVector Statements; + ui32 CurrentStatementId = 0; + ui32 StatementResultIndex = 0; + ui32 StatementResultSize = 0; + NKikimrKqp::EQueryAction GetAction() const { return RequestEv->GetAction(); } @@ -370,6 +375,44 @@ class TKqpQueryState : public TNonCopyable { return RequestEv->GetTxControl(); } + bool ProcessingLastStatement() const { + return CurrentStatementId + 1 >= Statements.size(); + } + + void PrepareCurrentStatement() { + QueryData = {}; + PreparedQuery = {}; + CompileResult = {}; + TxCtx = {}; + CurrentTx = 0; + TableVersions = {}; + MaxReadType = ETableReadType::Other; + Commit = false; + Commited = false; + TopicOperations = {}; + ReplayMessage = {}; + } + + void PrepareNextStatement() { + CurrentStatementId++; + StatementResultIndex += StatementResultSize; + StatementResultSize = 0; + PrepareCurrentStatement(); + } + + void PrepareStatementTransaction(NKqpProto::TKqpPhyTx_EType txType) { + if (!HasTxControl()) { + switch (txType) { + case NKqpProto::TKqpPhyTx::TYPE_SCHEME: + TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; + break; + default: + Commit = true; + TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + } + } + } + // validate the compiled query response and ensure that all table versions are not // changed since the last compilation. bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response); @@ -378,6 +421,7 @@ class TKqpQueryState : public TNonCopyable { std::unique_ptr BuildNavigateKeySet(); // same the context of the compiled query to the query state. bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); + bool SaveAndCheckParseResult(TEvKqp::TEvParseResponse&& ev); // build the compilation request. std::unique_ptr BuildCompileRequest(std::shared_ptr> cookie); // TODO(gvit): get rid of code duplication in these requests, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 04c47ab0baba..e43117fa8dd3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -513,6 +514,19 @@ class TKqpSessionActor : public TActorBootstrapped { OnSuccessCompileRequest(); } + void Handle(TEvKqp::TEvParseResponse::TPtr& ev) { + QueryState->SaveAndCheckParseResult(std::move(*ev->Get())); + CompileStatement(); + } + + void CompileStatement() { + auto request = QueryState->BuildCompileRequest(CompilationCookie); + LOG_D("Sending CompileQuery request"); + + Send(MakeKqpCompileServiceID(SelfId().NodeId()), request.release(), 0, QueryState->QueryId, + QueryState->KqpSessionSpan.GetTraceId()); + } + void OnSuccessCompileRequest() { if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE || QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXPLAIN) @@ -799,7 +813,7 @@ class TKqpSessionActor : public TActorBootstrapped { request.QueryType = queryState->GetType(); if (Y_LIKELY(queryState->PreparedQuery)) { ui64 resultSetsCount = queryState->PreparedQuery->GetPhysicalQuery().ResultBindingsSize(); - request.AllowTrailingResults = (resultSetsCount == 1); + request.AllowTrailingResults = (resultSetsCount == 1 && queryState->Statements.size() <= 1); request.AllowTrailingResults &= (QueryState->RequestEv->GetSupportsStreamTrailingResult()); } } @@ -969,6 +983,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { if (tx) { + QueryState->PrepareStatementTransaction(tx->GetType()); switch (tx->GetType()) { case NKqpProto::TKqpPhyTx::TYPE_SCHEME: YQL_ENSURE(tx->StagesSize() == 0); @@ -1118,9 +1133,18 @@ class TKqpSessionActor : public TActorBootstrapped { ExecuterId = RegisterWithSameMailbox(executerActor); } + static ui32 GetResultsCount(const IKqpGateway::TExecPhysicalRequest& req) { + ui32 results = 0; + for (const auto& transaction : req.Transactions) { + results += transaction.Body->ResultsSize(); + } + return results; + } + void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request, bool isRollback = false) { if (QueryState) { request.Orbit = std::move(QueryState->Orbit); + QueryState->StatementResultSize = GetResultsCount(request); } request.PerRequestDataSizeLimit = RequestControls.PerRequestDataSizeLimit; request.MaxShardCount = RequestControls.MaxShardCount; @@ -1132,7 +1156,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - Settings.TableService.GetEnableOlapSink()); + Settings.TableService.GetEnableOlapSink(), QueryState ? QueryState->StatementResultIndex : 0); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1623,7 +1647,16 @@ class TKqpSessionActor : public TActorBootstrapped { QueryResponse = std::move(resEv); - Cleanup(); + ProcessNextStatement(); + } + + void ProcessNextStatement() { + if (QueryState->ProcessingLastStatement()) { + Cleanup(); + return; + } + QueryState->PrepareNextStatement(); + CompileStatement(); } void ReplyQueryCompileError() { @@ -2108,6 +2141,7 @@ class TKqpSessionActor : public TActorBootstrapped { // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, Handle); + hFunc(TEvKqp::TEvParseResponse, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvents::TEvUndelivered, HandleNoop); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 5f01ec1ef5db..89bb1bc299fe 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -3680,6 +3680,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { } appConfig.MutableTableServiceConfig()->SetEnablePgConstsToParams(true); + appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() .SetAppConfig(appConfig) @@ -3716,6 +3717,32 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + { + // Check NoTx + { + const auto query = Q_(R"( + CREATE TABLE PgTable3 ( + key int4 PRIMARY KEY, + value text + ); + SELECT * FROM PgTable3 WHERE key = 3; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable3 WHERE key = 4; + SELECT * FROM PgTable3 WHERE key = 5; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + } + { // Check values without table { @@ -3763,7 +3790,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { SELECT (1, 2); )"); auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString()); UNIT_ASSERT(result.GetIssues().ToString().Contains("alternative is not implemented yet : 138")); } } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index f7a2639a5968..6b334262a9d9 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -1764,6 +1764,212 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT(!service->IsUnsafeToShutdown()); } } + + Y_UNIT_TEST(Ddl_Dml) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + appConfig.MutableTableServiceConfig()->SetEnableAstCache(true); + appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetQueryClient(); + + { + // Base test with ddl and dml statements + auto result = db.ExecuteQuery(R"( + DECLARE $name AS Text; + $a = (SELECT * FROM TestDdl1); + CREATE TABLE TestDdl1 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl1 (Key, Value) VALUES (1, "One"); + CREATE TABLE TestDdl2 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl1 (Key, Value) VALUES (2, "Two"); + SELECT * FROM $a; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three"); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl1; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdl1 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl1', error: path exist")); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdl2 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl2', error: path exist")); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 SELECT * FROM TestDdl1; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + } + + { + // Test with query with error + auto result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 (Key, Value) VALUES (1, "One"); + CREATE TABLE TestDdl3 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); + CREATE TABLE TestDdl2 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + CREATE TABLE TestDdl4 ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three"); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + } + + { + // Check result sets + auto result = db.ExecuteQuery(R"( + $a = (SELECT * FROM TestDdl1); + SELECT * FROM $a; + UPSERT INTO TestDdl1 (Key, Value) VALUES (4, "Four"); + SELECT * FROM $a; + CREATE TABLE TestDdl4 ( + Key Uint64, + Value Uint64, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl4 (Key, Value) VALUES (1, 1); + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(2))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 SELECT * FROM TestDdl1; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + } + + { + // Check EVALUATE FOR + auto result = db.ExecuteQuery(R"( + EVALUATE FOR $i IN AsList(1, 2, 3) DO BEGIN + SELECT $i; + SELECT $i; + END DO; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::UNSUPPORTED, result.GetIssues().ToString()); + } + + { + // Check parser errors + auto result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl4 (Key, Value) VALUES (2, 2); + SELECT * FROM $a; + UPSERT INTO TestDdl4 (Key, Value) VALUES (3, 3); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Unknown name: $a")); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl4 (Key, Value) VALUES (2, 2); + UPSERT INTO TestDdl4 (Key, Value) VALUES (3, "3"); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Error: Failed to convert 'Value': String to Optional")); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];[1u]];[[2u];[2u]]])", FormatResultSetYson(result.GetResultSet(0))); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdl5 ( + Key Uint64, + Value Uint64, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdl5 (Key, Value) VALUES (1, 1); + UPSERT INTO TestDdl5 (Key, Value) VALUES (3, "3"); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Error: Failed to convert 'Value': String to Optional")); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl5; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 9f00e30fbf7d..05a913f54e59 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -272,5 +272,8 @@ message TTableServiceConfig { optional bool EnablePgConstsToParams = 53 [default = false]; optional uint64 ExtractPredicateRangesLimit = 54 [default = 10000]; + optional bool EnableOlapSink = 55 [default = false]; + + optional bool EnablePerStatementQueryExecution = 56 [default = false]; }; diff --git a/ydb/library/yql/parser/pg_wrapper/interface/parser.h b/ydb/library/yql/parser/pg_wrapper/interface/parser.h index d481820e7ef1..e5a019213c7e 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/parser.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/parser.h @@ -11,5 +11,6 @@ struct TTranslationSettings; namespace NSQLTranslationPG { NYql::TAstParseResult PGToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings); +TVector PGToYqlStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings); } // NSQLTranslationPG diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 04d1388fd941..4f0e13083d38 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -152,7 +152,7 @@ std::shared_ptr ListMake1(void* cell) { #define CAST_NODE(nodeType, nodeptr) CastNode(nodeptr, T_##nodeType) #define CAST_NODE_EXT(nodeType, tag, nodeptr) CastNode(nodeptr, tag) -#define LIST_CAST_NTH(nodeType, list, index) CAST_NODE(nodeType, list_nth(list, i)) +#define LIST_CAST_NTH(nodeType, list, index) CAST_NODE(nodeType, list_nth(list, index)) #define LIST_CAST_EXT_NTH(nodeType, tag, list, index) CAST_NODE_EXT(nodeType, tag, list_nth(list, i)) const Node* ListNodeNth(const List* list, int index) { @@ -273,13 +273,26 @@ class TConverter : public IPGParseEvents { using TViews = THashMap; - TConverter(TAstParseResult& astParseResult, const NSQLTranslation::TTranslationSettings& settings, const TString& query) - : AstParseResult(astParseResult) + struct TState { + TString CostBasedOptimizer; + TVector Statements; + ui32 ReadIndex = 0; + TViews Views; + TVector CTE; + TVector Positions = {NYql::TPosition()}; + THashMap ParamNameToPgTypeName; + THashMap AutoParamValues; + }; + + TConverter(TVector& astParseResults, const NSQLTranslation::TTranslationSettings& settings, + const TString& query, bool perStatementResult) + : AstParseResults(astParseResults) , Settings(settings) , DqEngineEnabled(Settings.DqDefaultAuto->Allow()) , BlockEngineEnabled(Settings.BlockDefaultAuto->Allow()) + , PerStatementResult(perStatementResult) { - Positions.push_back({}); + AstParseResults.push_back({}); ScanRows(query); for (auto& flag : Settings.Flags) { @@ -314,77 +327,95 @@ class TConverter : public IPGParseEvents { const auto typeOid = Settings.PgParameterTypeOids[i]; const auto& typeName = typeOid != UNKNOWNOID ? NPg::LookupType(typeOid).Name : DEFAULT_PARAM_TYPE; - ParamNameToPgTypeName[paramName] = typeName; + State.ParamNameToPgTypeName[paramName] = typeName; } } void OnResult(const List* raw) { - AstParseResult.Pool = std::make_unique(4096); - AstParseResult.Root = ParseResult(raw); - if (!AutoParamValues.empty()) { - AstParseResult.PgAutoParamValues = std::move(AutoParamValues); + if (!PerStatementResult) { + AstParseResults[StatementId].Pool = std::make_unique(4096); + AstParseResults[StatementId].Root = ParseResult(raw); + if (!State.AutoParamValues.empty()) { + AstParseResults[StatementId].PgAutoParamValues = std::move(State.AutoParamValues); + } + return; + } + AstParseResults.resize(ListLength(raw)); + for (; StatementId < AstParseResults.size(); ++StatementId) { + AstParseResults[StatementId].Pool = std::make_unique(4096); + AstParseResults[StatementId].Root = ParseResult(raw, StatementId); + if (!State.AutoParamValues.empty()) { + AstParseResults[StatementId].PgAutoParamValues = std::move(State.AutoParamValues); + } + State = {}; } } void OnError(const TIssue& issue) { - AstParseResult.Issues.AddIssue(issue); + AstParseResults[StatementId].Issues.AddIssue(issue); } - TAstNode* ParseResult(const List* raw) { + TAstNode* ParseResult(const List* raw, const TMaybe statementId = Nothing()) { auto configSource = L(A("DataSource"), QA(TString(NYql::ConfigProviderName))); - Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, + State.Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, QA("OrderedColumns")))); - ui32 blockEnginePgmPos = Statements.size(); - Statements.push_back(configSource); - ui32 costBasedOptimizerPos = Statements.size(); - Statements.push_back(configSource); - ui32 dqEnginePgmPos = Statements.size(); - Statements.push_back(configSource); + ui32 blockEnginePgmPos = State.Statements.size(); + State.Statements.push_back(configSource); + ui32 costBasedOptimizerPos = State.Statements.size(); + State.Statements.push_back(configSource); + ui32 dqEnginePgmPos = State.Statements.size(); + State.Statements.push_back(configSource); - for (int i = 0; i < ListLength(raw); ++i) { - if (!ParseRawStmt(LIST_CAST_NTH(RawStmt, raw, i))) { + if (statementId) { + if (!ParseRawStmt(LIST_CAST_NTH(RawStmt, raw, *statementId))) { return nullptr; } + } else { + for (int i = 0; i < ListLength(raw); ++i) { + if (!ParseRawStmt(LIST_CAST_NTH(RawStmt, raw, i))) { + return nullptr; + } + } } - if (!Views.empty()) { + if (!State.Views.empty()) { AddError("Not all views have been dropped"); return nullptr; } if (Settings.EndOfQueryCommit) { - Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), A("world")))); } AddVariableDeclarations(); - Statements.push_back(L(A("return"), A("world"))); + State.Statements.push_back(L(A("return"), A("world"))); if (DqEngineEnabled) { - Statements[dqEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, + State.Statements[dqEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, QA("DqEngine"), QA(DqEngineForce ? "force" : "auto"))); } else { - Statements.erase(Statements.begin() + dqEnginePgmPos); + State.Statements.erase(State.Statements.begin() + dqEnginePgmPos); } - if (CostBasedOptimizer) { - Statements[costBasedOptimizerPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, - QA("CostBasedOptimizer"), QA(CostBasedOptimizer))); + if (State.CostBasedOptimizer) { + State.Statements[costBasedOptimizerPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, + QA("CostBasedOptimizer"), QA(State.CostBasedOptimizer))); } else { - Statements.erase(Statements.begin() + costBasedOptimizerPos); + State.Statements.erase(State.Statements.begin() + costBasedOptimizerPos); } if (BlockEngineEnabled) { - Statements[blockEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, + State.Statements[blockEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, QA("BlockEngine"), QA(BlockEngineForce ? "force" : "auto"))); } else { - Statements.erase(Statements.begin() + blockEnginePgmPos); + State.Statements.erase(State.Statements.begin() + blockEnginePgmPos); } - return VL(Statements.data(), Statements.size()); + return VL(State.Statements.data(), State.Statements.size()); } [[nodiscard]] @@ -542,8 +573,8 @@ class TConverter : public IPGParseEvents { using TAutoParamName = TString; TAutoParamName AddAutoParam(Ydb::TypedValue&& val) { - auto nextName = TString(AUTO_PARAM_PREFIX) + ToString(AutoParamValues.size()); - AutoParamValues.emplace(nextName, std::move(val)); + auto nextName = TString(AUTO_PARAM_PREFIX) + ToString(State.AutoParamValues.size()); + State.AutoParamValues.emplace(nextName, std::move(val)); return nextName; } @@ -570,9 +601,9 @@ class TConverter : public IPGParseEvents { const auto paramType = L(A("ListType"), VL(autoParamTupleType)); const auto paramName = AddAutoParam(MakeYdbListTupleParamValue(std::move(ydbValues), std::move(columnTypes))); - Statements.push_back(L(A("declare"), A(paramName), paramType)); + State.Statements.push_back(L(A("declare"), A(paramName), paramType)); - YQL_CLOG(INFO, Default) << "Successfully autoparametrized VALUES at" << Positions.back(); + YQL_CLOG(INFO, Default) << "Successfully autoparametrized VALUES at" << State.Positions.back(); return A(paramName); } @@ -708,9 +739,9 @@ class TConverter : public IPGParseEvents { ) { bool isValuesClauseOfInsertStmt = fillTargetColumns; - CTE.emplace_back(); + State.CTE.emplace_back(); Y_DEFER { - CTE.pop_back(); + State.CTE.pop_back(); }; if (value->withClause) { @@ -1215,13 +1246,13 @@ class TConverter : public IPGParseEvents { } auto resOptions = QL(QL(QA("type")), QL(QA("autoref"))); - Statements.push_back(L(A("let"), A("output"), output)); - Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName))))); - Statements.push_back(L(A("let"), A("world"), L(A("Write!"), + State.Statements.push_back(L(A("let"), A("output"), output)); + State.Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName))))); + State.Statements.push_back(L(A("let"), A("world"), L(A("Write!"), A("world"), A("result_sink"), L(A("Key")), A("output"), resOptions))); - Statements.push_back(L(A("let"), A("world"), L(A("Commit!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("Commit!"), A("world"), A("result_sink")))); - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -1273,7 +1304,7 @@ class TConverter : public IPGParseEvents { return false; } - auto& currentCTEs = CTE.back(); + auto& currentCTEs = State.CTE.back(); if (currentCTEs.find(view.Name) != currentCTEs.end()) { AddError(TStringBuilder() << "CTE already exists: '" << view.Name << "'"); return false; @@ -1426,7 +1457,7 @@ class TConverter : public IPGParseEvents { const auto writeOptions = BuildWriteOptions(value, std::move(returningList)); - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), L( @@ -1439,7 +1470,7 @@ class TConverter : public IPGParseEvents { ) )); - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -1498,13 +1529,13 @@ class TConverter : public IPGParseEvents { L(A("Void")), QVL(options.data(), options.size()))) )); - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), writeUpdate )); - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -1562,14 +1593,14 @@ class TConverter : public IPGParseEvents { return nullptr; } - auto it = Views.find(view.Name); - if (it != Views.end() && !value->replace) { + auto it = State.Views.find(view.Name); + if (it != State.Views.end() && !value->replace) { AddError(TStringBuilder() << "View already exists: '" << view.Name << "'"); return nullptr; } - Views[view.Name] = view; - return Statements.back(); + State.Views[view.Name] = view; + return State.Statements.back(); } #pragma region CreateTable @@ -1942,12 +1973,12 @@ class TConverter : public IPGParseEvents { } } - Statements.push_back( + State.Statements.push_back( L(A("let"), A("world"), L(A("Write!"), A("world"), sink, key, L(A("Void")), BuildCreateTableOptions(ctx)))); - return Statements.back(); + return State.Statements.back(); } #pragma endregion CreateTable @@ -1996,18 +2027,18 @@ class TConverter : public IPGParseEvents { } const auto name = StrVal(nameNode); - auto it = Views.find(name); - if (!value->missing_ok && it == Views.end()) { + auto it = State.Views.find(name); + if (!value->missing_ok && it == State.Views.end()) { AddError(TStringBuilder() << "View not found: '" << name << "'"); return nullptr; } - if (it != Views.end()) { - Views.erase(it); + if (it != State.Views.end()) { + State.Views.erase(it); } } - return Statements.back(); + return State.Statements.back(); } TAstNode* ParseDropTableStmt(const DropStmt* value, const TVector& names) { @@ -2030,7 +2061,7 @@ class TConverter : public IPGParseEvents { } TString mode = (value->missing_ok) ? "drop_if_exists" : "drop"; - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), L( @@ -2046,7 +2077,7 @@ class TConverter : public IPGParseEvents { )); } - return Statements.back(); + return State.Statements.back(); } TAstNode* ParseDropIndexStmt(const DropStmt* value, const TVector& names) { @@ -2070,7 +2101,7 @@ class TConverter : public IPGParseEvents { ); TString missingOk = (value->missing_ok) ? "true" : "false"; - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), L( @@ -2087,7 +2118,7 @@ class TConverter : public IPGParseEvents { )); } - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -2120,7 +2151,7 @@ class TConverter : public IPGParseEvents { if (Settings.GUCSettings) { Settings.GUCSettings->Set(name, rawStr, value->is_local); } - return Statements.back(); + return State.Statements.back(); } if (name == "useblocks" || name == "emitaggapply") { @@ -2133,7 +2164,7 @@ class TConverter : public IPGParseEvents { if (NodeTag(arg) == T_A_Const && (NodeTag(CAST_NODE(A_Const, arg)->val) == T_String)) { TString rawStr = StrVal(CAST_NODE(A_Const, arg)->val); auto configSource = L(A("DataSource"), QA(TString(NYql::ConfigProviderName))); - Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, + State.Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource, QA(TString(rawStr == "true" ? "" : "Disable") + TString((name == "useblocks") ? "UseBlocks" : "PgEmitAggApply"))))); } else { AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option"); @@ -2192,7 +2223,7 @@ class TConverter : public IPGParseEvents { auto rawStr = StrVal(CAST_NODE(A_Const, arg)->val); - Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), providerSource, + State.Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), providerSource, QA("Attr"), QAX(name.substr(dotPos + 1)), QAX(rawStr)))); } else { AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option"); @@ -2227,7 +2258,7 @@ class TConverter : public IPGParseEvents { return nullptr; } - CostBasedOptimizer = str; + State.CostBasedOptimizer = str; } else { AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option"); return nullptr; @@ -2237,7 +2268,7 @@ class TConverter : public IPGParseEvents { return nullptr; } - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -2316,7 +2347,7 @@ class TConverter : public IPGParseEvents { if (!returningList.empty()) { options.push_back(QL(QA("returning"), QVL(returningList.data(), returningList.size()))); } - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), L( @@ -2328,7 +2359,7 @@ class TConverter : public IPGParseEvents { QVL(options.data(), options.size()) ) )); - return Statements.back(); + return State.Statements.back(); } TMaybe GetConfigVariable(const TString& varName) { @@ -2387,15 +2418,15 @@ class TConverter : public IPGParseEvents { const auto selectOptions = QL(setItems, setOps); const auto output = L(A("PgSelect"), selectOptions); - Statements.push_back(L(A("let"), A("output"), output)); - Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName))))); + State.Statements.push_back(L(A("let"), A("output"), output)); + State.Statements.push_back(L(A("let"), A("result_sink"), L(A("DataSink"), QA(TString(NYql::ResultProviderName))))); const auto resOptions = QL(QL(QA("type")), QL(QA("autoref"))); - Statements.push_back(L(A("let"), A("world"), L(A("Write!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("Write!"), A("world"), A("result_sink"), L(A("Key")), A("output"), resOptions))); - Statements.push_back(L(A("let"), A("world"), L(A("Commit!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("Commit!"), A("world"), A("result_sink")))); - return Statements.back(); + return State.Statements.back(); } [[nodiscard]] @@ -2408,14 +2439,14 @@ class TConverter : public IPGParseEvents { case TRANS_STMT_ROLLBACK_TO: return true; case TRANS_STMT_COMMIT: - Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), A("world")))); if (Settings.GUCSettings) { Settings.GUCSettings->Commit(); } return true; case TRANS_STMT_ROLLBACK: - Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), + State.Statements.push_back(L(A("let"), A("world"), L(A("CommitAll!"), A("world"), QL(QL(QA("mode"), QA("rollback")))))); if (Settings.GUCSettings) { Settings.GUCSettings->RollBack(); @@ -2485,7 +2516,7 @@ class TConverter : public IPGParseEvents { desc.emplace_back(QL(QA("dataColumns"), QVL(coverColumns->data(), coverColumns->size()))); desc.emplace_back(QL(QA("flags"), QVL(flags.data(), flags.size()))); - Statements.push_back(L( + State.Statements.push_back(L( A("let"), A("world"), L( @@ -2501,7 +2532,7 @@ class TConverter : public IPGParseEvents { ) )); - return Statements.back(); + return State.Statements.back(); } TFromDesc ParseFromClause(const Node* node) { @@ -2527,11 +2558,11 @@ class TConverter : public IPGParseEvents { auto colNamesTuple = QVL(colNamesNodes.data(), colNamesNodes.size()); if (p.InjectRead) { - auto label = "read" + ToString(ReadIndex); - Statements.push_back(L(A("let"), A(label), p.Source)); - Statements.push_back(L(A("let"), A("world"), L(A("Left!"), A(label)))); + auto label = "read" + ToString(State.ReadIndex); + State.Statements.push_back(L(A("let"), A(label), p.Source)); + State.Statements.push_back(L(A("let"), A("world"), L(A("Left!"), A(label)))); fromList.push_back(QL(L(A("Right!"), A(label)), aliasNode, colNamesTuple)); - ++ReadIndex; + ++State.ReadIndex; } else { fromList.push_back(QL(p.Source, aliasNode, colNamesTuple)); } @@ -2660,7 +2691,7 @@ class TConverter : public IPGParseEvents { const TView* view = nullptr; if (StrLength(value->schemaname) == 0) { - for (auto rit = CTE.rbegin(); rit != CTE.rend(); ++rit) { + for (auto rit = State.CTE.rbegin(); rit != State.CTE.rend(); ++rit) { auto cteIt = rit->find(value->relname); if (cteIt != rit->end()) { view = &cteIt->second; @@ -2668,8 +2699,8 @@ class TConverter : public IPGParseEvents { } } if (!view) { - auto viewIt = Views.find(value->relname); - if (viewIt != Views.end()) { + auto viewIt = State.Views.find(value->relname); + if (viewIt != State.Views.end()) { view = &viewIt->second; } } @@ -2980,8 +3011,8 @@ class TConverter : public IPGParseEvents { TAstNode* ParseParamRefExpr(const ParamRef* value) { const auto varName = PREPARED_PARAM_PREFIX + ToString(value->number); - if (!ParamNameToPgTypeName.contains(varName)) { - ParamNameToPgTypeName[varName] = DEFAULT_PARAM_TYPE; + if (!State.ParamNameToPgTypeName.contains(varName)) { + State.ParamNameToPgTypeName[varName] = DEFAULT_PARAM_TYPE; } return A(varName); } @@ -3121,9 +3152,9 @@ class TConverter : public IPGParseEvents { } const auto& paramName = AddAutoParam(std::move(typedValue)); - Statements.push_back(L(A("declare"), A(paramName), pgType)); + State.Statements.push_back(L(A("declare"), A(paramName), pgType)); - YQL_CLOG(INFO, Default) << "Autoparametrized " << paramName << " at " << Positions.back(); + YQL_CLOG(INFO, Default) << "Autoparametrized " << paramName << " at " << State.Positions.back(); return A(paramName); } @@ -4236,9 +4267,9 @@ class TConverter : public IPGParseEvents { } void AddVariableDeclarations() { - for (const auto& [varName, typeName] : ParamNameToPgTypeName) { + for (const auto& [varName, typeName] : State.ParamNameToPgTypeName) { const auto pgType = L(A("PgType"), QA(typeName)); - Statements.push_back(L(A("declare"), A(varName), pgType)); + State.Statements.push_back(L(A("declare"), A(varName), pgType)); } } @@ -4277,11 +4308,11 @@ class TConverter : public IPGParseEvents { } TAstNode* VL(TAstNode** nodes, ui32 size, TPosition pos = {}) { - return TAstNode::NewList(pos.Row ? pos : Positions.back(), nodes, size, *AstParseResult.Pool); + return TAstNode::NewList(pos.Row ? pos : State.Positions.back(), nodes, size, *AstParseResults[StatementId].Pool); } TAstNode* VL(TArrayRef nodes, TPosition pos = {}) { - return TAstNode::NewList(pos.Row ? pos : Positions.back(), nodes.data(), nodes.size(), *AstParseResult.Pool); + return TAstNode::NewList(pos.Row ? pos : State.Positions.back(), nodes.data(), nodes.size(), *AstParseResults[StatementId].Pool); } TAstNode* QVL(TAstNode** nodes, ui32 size, TPosition pos = {}) { @@ -4297,11 +4328,11 @@ class TConverter : public IPGParseEvents { } TAstNode* A(const TStringBuf str, TPosition pos = {}, ui32 flags = 0) { - return TAstNode::NewAtom(pos.Row ? pos : Positions.back(), str, *AstParseResult.Pool, flags); + return TAstNode::NewAtom(pos.Row ? pos : State.Positions.back(), str, *AstParseResults[StatementId].Pool, flags); } TAstNode* AX(const TString& str, TPosition pos = {}) { - return A(str, pos.Row ? pos : Positions.back(), TNodeFlags::ArbitraryContent); + return A(str, pos.Row ? pos : State.Positions.back(), TNodeFlags::ArbitraryContent); } TAstNode* Q(TAstNode* node, TPosition pos = {}) { @@ -4320,7 +4351,7 @@ class TConverter : public IPGParseEvents { TAstNode* L(TNodes... nodes) { TLState state; LImpl(state, nodes...); - return TAstNode::NewList(state.Position.Row ? state.Position : Positions.back(), state.Nodes.data(), state.Nodes.size(), *AstParseResult.Pool); + return TAstNode::NewList(state.Position.Row ? state.Position : State.Positions.back(), state.Nodes.data(), state.Nodes.size(), *AstParseResults[StatementId].Pool); } template @@ -4344,11 +4375,11 @@ class TConverter : public IPGParseEvents { private: void AddError(const TString& value) { - AstParseResult.Issues.AddIssue(TIssue(Positions.back(), value)); + AstParseResults[StatementId].Issues.AddIssue(TIssue(State.Positions.back(), value)); } void AddWarning(int code, const TString& value) { - AstParseResult.Issues.AddIssue(TIssue(Positions.back(), value).SetCode(code, ESeverity::TSeverityIds_ESeverityId_S_WARNING)); + AstParseResults[StatementId].Issues.AddIssue(TIssue(State.Positions.back(), value).SetCode(code, ESeverity::TSeverityIds_ESeverityId_S_WARNING)); } struct TLState { @@ -4379,15 +4410,15 @@ class TConverter : public IPGParseEvents { void PushPosition(int location) { if (location == -1) { - Positions.push_back(Positions.back()); + State.Positions.push_back(State.Positions.back()); return; } - Positions.push_back(Location2Position(location)); + State.Positions.push_back(Location2Position(location)); }; void PopPosition() { - Positions.pop_back(); + State.Positions.pop_back(); } NYql::TPosition Location2Position(int location) const { @@ -4439,26 +4470,21 @@ class TConverter : public IPGParseEvents { } private: - TAstParseResult& AstParseResult; + TVector& AstParseResults; NSQLTranslation::TTranslationSettings Settings; bool DqEngineEnabled = false; bool DqEngineForce = false; - TString CostBasedOptimizer; bool BlockEngineEnabled = false; bool BlockEngineForce = false; - TVector Statements; - ui32 ReadIndex = 0; - TViews Views; - TVector CTE; TString TablePathPrefix; - TVector Positions; TVector RowStarts; ui32 QuerySize; TString Provider; static const THashMap ProviderToInsertModeMap; - THashMap ParamNameToPgTypeName; - THashMap AutoParamValues; + TState State; + ui32 StatementId = 0; + bool PerStatementResult; }; const THashMap TConverter::ProviderToInsertModeMap = { @@ -4467,10 +4493,18 @@ const THashMap TConverter::ProviderToInsertModeMap = { }; NYql::TAstParseResult PGToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings) { - NYql::TAstParseResult result; - TConverter converter(result, settings, query); + TVector results; + TConverter converter(results, settings, query, false); + NYql::PGParse(query, converter); + Y_ENSURE(!results.empty()); + return std::move(results.back()); +} + +TVector PGToYqlStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings) { + TVector results; + TConverter converter(results, settings, query, true); NYql::PGParse(query, converter); - return result; + return results; } } // NSQLTranslationPG diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 9811c49b5f05..5a2c605e9296 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -13,6 +13,12 @@ NYql::TAstParseResult PGToYql(const TString& query, const NSQLTranslation::TTran return result; } +TVector PGToYqlStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings) { + Y_UNUSED(query); + Y_UNUSED(settings); + return {}; +} + } // NSQLTranslationPG namespace NYql { diff --git a/ydb/library/yql/sql/sql.cpp b/ydb/library/yql/sql/sql.cpp index 8d5371894f5f..76f19ca24d9a 100644 --- a/ydb/library/yql/sql/sql.cpp +++ b/ydb/library/yql/sql/sql.cpp @@ -163,4 +163,47 @@ namespace NSQLTranslation { } } + TVector SqlToAstStatements(const TString& query, const TTranslationSettings& settings, + NYql::TWarningRules* warningRules, ui16* actualSyntaxVersion) + { + TVector result; + NYql::TIssues issues; + TTranslationSettings parsedSettings(settings); + google::protobuf::Arena arena; + if (!parsedSettings.Arena) { + parsedSettings.Arena = &arena; + } + + if (!ParseTranslationSettings(query, parsedSettings, issues)) { + return {}; + } + + if (actualSyntaxVersion) { + *actualSyntaxVersion = parsedSettings.SyntaxVersion; + } + + if (!parsedSettings.DeclaredNamedExprs.empty() && !parsedSettings.PgParser && parsedSettings.SyntaxVersion != 1) { + issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR, + "Externally declared named expressions not supported in V0 syntax")); + return {}; + } + + if (parsedSettings.PgParser) { + return NSQLTranslationPG::PGToYqlStatements(query, parsedSettings); + } + + switch (parsedSettings.SyntaxVersion) { + case 0: + issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR, + "V0 syntax is disabled")); + return {}; + case 1: + return NSQLTranslationV1::SqlToAstStatements(query, parsedSettings, warningRules); + default: + issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR, + TStringBuilder() << "Unknown SQL syntax version: " << parsedSettings.SyntaxVersion)); + return {}; + } + } + } // namespace NSQLTranslation diff --git a/ydb/library/yql/sql/sql.h b/ydb/library/yql/sql/sql.h index db699ebece13..d775555e3b26 100644 --- a/ydb/library/yql/sql/sql.h +++ b/ydb/library/yql/sql/sql.h @@ -21,5 +21,7 @@ namespace NSQLTranslation { const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr); ILexer::TPtr SqlLexer(const TString& query, NYql::TIssues& issues, const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr); NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings); + TVector SqlToAstStatements(const TString& query, const TTranslationSettings& settings, + NYql::TWarningRules* warningRules = nullptr, ui16* actualSyntaxVersion = nullptr); } // namespace NSQLTranslationV0 diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index c9238d3b5c1b..7d354eff8564 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -27,6 +27,20 @@ TAstNode* SqlASTToYql(const google::protobuf::Message& protoAst, TContext& ctx) return nullptr; } +TAstNode* SqlASTsToYqls(const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& ast, TContext& ctx) { + TSqlQuery query(ctx, ctx.Settings.Mode, true); + TNodePtr node(query.Build(ast)); + try { + if (node && node->Init(ctx, nullptr)) { + return node->Translate(ctx); + } + } catch (const NProtoAST::TTooManyErrors&) { + // do not add error issue, no room for it + } + + return nullptr; +} + void SqlASTToYqlImpl(NYql::TAstParseResult& res, const google::protobuf::Message& protoAst, TContext& ctx) { YQL_ENSURE(!ctx.Issues.Size()); @@ -45,6 +59,22 @@ void SqlASTToYqlImpl(NYql::TAstParseResult& res, const google::protobuf::Message } } +void SqlASTsToYqlsImpl(NYql::TAstParseResult& res, const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& ast, TContext& ctx) { + res.Root = SqlASTsToYqls(ast, ctx); + res.Pool = std::move(ctx.Pool); + if (!res.Root) { + if (ctx.Issues.Size()) { + ctx.IncrementMonCounter("sql_errors", "AstToYqlError"); + } else { + ctx.IncrementMonCounter("sql_errors", "AstToYqlSilentError"); + ctx.Error() << "Error occurred on parse SQL query, but no error is collected" << + ", please send this request over bug report into YQL interface or write on yql@ maillist"; + } + } else { + ctx.WarnUnusedHints(); + } +} + NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings) @@ -84,4 +114,112 @@ NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTra return res; } +bool NeedUseForAllStatements(const TRule_sql_stmt_core::AltCase& subquery) { + switch (subquery) { + case TRule_sql_stmt_core::kAltSqlStmtCore1: // pragma + case TRule_sql_stmt_core::kAltSqlStmtCore3: // named nodes + case TRule_sql_stmt_core::kAltSqlStmtCore6: // use + case TRule_sql_stmt_core::kAltSqlStmtCore12: // declare + case TRule_sql_stmt_core::kAltSqlStmtCore13: // import + case TRule_sql_stmt_core::kAltSqlStmtCore14: // export + case TRule_sql_stmt_core::kAltSqlStmtCore18: // define action or subquery + return true; + case TRule_sql_stmt_core::ALT_NOT_SET: + case TRule_sql_stmt_core::kAltSqlStmtCore2: // select + case TRule_sql_stmt_core::kAltSqlStmtCore4: // create table + case TRule_sql_stmt_core::kAltSqlStmtCore5: // drop table + case TRule_sql_stmt_core::kAltSqlStmtCore7: // into table + case TRule_sql_stmt_core::kAltSqlStmtCore8: // commit + case TRule_sql_stmt_core::kAltSqlStmtCore9: // update + case TRule_sql_stmt_core::kAltSqlStmtCore10: // delete + case TRule_sql_stmt_core::kAltSqlStmtCore11: // rollback + case TRule_sql_stmt_core::kAltSqlStmtCore15: // alter table + case TRule_sql_stmt_core::kAltSqlStmtCore16: // alter external table + case TRule_sql_stmt_core::kAltSqlStmtCore17: // do + case TRule_sql_stmt_core::kAltSqlStmtCore19: // if + case TRule_sql_stmt_core::kAltSqlStmtCore20: // for + case TRule_sql_stmt_core::kAltSqlStmtCore21: // values + case TRule_sql_stmt_core::kAltSqlStmtCore22: // create user + case TRule_sql_stmt_core::kAltSqlStmtCore23: // alter user + case TRule_sql_stmt_core::kAltSqlStmtCore24: // create group + case TRule_sql_stmt_core::kAltSqlStmtCore25: // alter group + case TRule_sql_stmt_core::kAltSqlStmtCore26: // drop role + case TRule_sql_stmt_core::kAltSqlStmtCore27: // create object + case TRule_sql_stmt_core::kAltSqlStmtCore28: // alter object + case TRule_sql_stmt_core::kAltSqlStmtCore29: // drop object + case TRule_sql_stmt_core::kAltSqlStmtCore30: // create external data source + case TRule_sql_stmt_core::kAltSqlStmtCore31: // alter external data source + case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source + case TRule_sql_stmt_core::kAltSqlStmtCore33: // create replication + case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication + case TRule_sql_stmt_core::kAltSqlStmtCore35: // create topic + case TRule_sql_stmt_core::kAltSqlStmtCore36: // alter topic + case TRule_sql_stmt_core::kAltSqlStmtCore37: // drop topic + case TRule_sql_stmt_core::kAltSqlStmtCore38: // grant permissions + case TRule_sql_stmt_core::kAltSqlStmtCore39: // revoke permissions + case TRule_sql_stmt_core::kAltSqlStmtCore40: // alter table store + case TRule_sql_stmt_core::kAltSqlStmtCore41: // upsert object + case TRule_sql_stmt_core::kAltSqlStmtCore42: // create view + case TRule_sql_stmt_core::kAltSqlStmtCore43: // drop view + return false; + } +} + +TVector SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules) +{ + TVector res; + const TString queryName = "query"; + TIssues issues; + + NSQLTranslation::TSQLHints hints; + auto lexer = MakeLexer(settings.AnsiLexer); + YQL_ENSURE(lexer); + if (!CollectSqlHints(*lexer, query, queryName, settings.File, hints, issues, settings.MaxErrors)) { + return res; + } + + TContext ctx(settings, hints, issues); + NSQLTranslation::TErrorCollectorOverIssues collector(issues, settings.MaxErrors, settings.File); + + google::protobuf::Message* astProto(SqlAST(query, queryName, collector, settings.AnsiLexer, settings.Arena)); + if (astProto) { + auto ast = static_cast(*astProto); + const auto& query = ast.GetRule_sql_query(); + if (query.Alt_case() == NSQLv1Generated::TRule_sql_query::kAltSqlQuery1) { + std::vector<::NSQLv1Generated::TRule_sql_stmt_core> commonStates; + std::vector<::NSQLv1Generated::TRule_sql_stmt_core> result; + const auto& statements = query.GetAlt_sql_query1().GetRule_sql_stmt_list1(); + if (NeedUseForAllStatements(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2().Alt_case())) { + commonStates.push_back(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2()); + } else { + TContext ctx(settings, hints, issues); + res.emplace_back(); + SqlASTsToYqlsImpl(res.back(), {statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2()}, ctx); + res.back().Issues = std::move(issues); + issues.Clear(); + } + for (auto block: statements.GetBlock3()) { + if (NeedUseForAllStatements(block.GetRule_sql_stmt2().GetRule_sql_stmt_core2().Alt_case())) { + commonStates.push_back(block.GetRule_sql_stmt2().GetRule_sql_stmt_core2()); + continue; + } + TContext ctx(settings, hints, issues); + res.emplace_back(); + result = commonStates; + result.push_back(block.GetRule_sql_stmt2().GetRule_sql_stmt_core2()); + SqlASTsToYqlsImpl(res.back(), result, ctx); + res.back().Issues = std::move(issues); + issues.Clear(); + } + } + } else { + ctx.IncrementMonCounter("sql_errors", "AstError"); + } + if (warningRules) { + *warningRules = ctx.WarningPolicy.GetRules(); + ctx.WarningPolicy.Clear(); + } + return res; +} + } // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/sql.h b/ydb/library/yql/sql/v1/sql.h index 82e5c97a6978..8283b9ec6f7b 100644 --- a/ydb/library/yql/sql/v1/sql.h +++ b/ydb/library/yql/sql/v1/sql.h @@ -17,5 +17,6 @@ namespace NSQLTranslationV1 { NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules = nullptr); NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings); + TVector SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules); } // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp index 38f141e0cd3d..d10d9bc71ae2 100644 --- a/ydb/library/yql/sql/v1/sql_query.cpp +++ b/ydb/library/yql/sql/v1/sql_query.cpp @@ -2660,6 +2660,75 @@ TNodePtr TSqlQuery::Build(const TSQLv1ParserAST& ast) { WarnUnusedNodes(); return result; } + +TNodePtr TSqlQuery::Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& statements) { + if (Mode == NSQLTranslation::ESqlMode::QUERY) { + // inject externally declared named expressions + for (auto [name, type] : Ctx.Settings.DeclaredNamedExprs) { + if (name.empty()) { + Error() << "Empty names for externally declared expressions are not allowed"; + return nullptr; + } + TString varName = "$" + name; + if (IsAnonymousName(varName)) { + Error() << "Externally declared name '" << name << "' is anonymous"; + return nullptr; + } + + auto parsed = ParseType(type, *Ctx.Pool, Ctx.Issues, Ctx.Pos()); + if (!parsed) { + Error() << "Failed to parse type for externally declared name '" << name << "'"; + return nullptr; + } + + TNodePtr typeNode = BuildBuiltinFunc(Ctx, Ctx.Pos(), "ParseType", { BuildLiteralRawString(Ctx.Pos(), type) }); + PushNamedAtom(Ctx.Pos(), varName); + // no duplicates are possible at this stage + bool isWeak = true; + Ctx.DeclareVariable(varName, typeNode, isWeak); + // avoid 'Symbol is not used' warning for externally declared expression + YQL_ENSURE(GetNamedNode(varName)); + } + } + + TVector blocks; + Ctx.PushCurrentBlocks(&blocks); + Y_DEFER { + Ctx.PopCurrentBlocks(); + }; + for (const auto& statement : statements) { + if (!Statement(blocks, statement)) { + return nullptr; + } + } + + ui32 topLevelSelects = 0; + bool hasTailOps = false; + for (auto& block : blocks) { + if (block->SubqueryAlias()) { + continue; + } + + if (block->HasSelectResult()) { + ++topLevelSelects; + } else if (topLevelSelects) { + hasTailOps = true; + } + } + + if ((Mode == NSQLTranslation::ESqlMode::SUBQUERY || Mode == NSQLTranslation::ESqlMode::LIMITED_VIEW) && (topLevelSelects != 1 || hasTailOps)) { + Error() << "Strictly one select/process/reduce statement is expected at the end of " + << (Mode == NSQLTranslation::ESqlMode::LIMITED_VIEW ? "view" : "subquery"); + return nullptr; + } + + if (!Ctx.PragmaAutoCommit && Ctx.Settings.EndOfQueryCommit && IsQueryMode(Mode)) { + AddStatementToBlocks(blocks, BuildCommitClusters(Ctx.Pos())); + } + + auto result = BuildQuery(Ctx.Pos(), blocks, true, Ctx.Scoped); + return result; +} namespace { static bool BuildColumnFeatures(std::map& result, const TRule_column_schema& columnSchema, const NYql::TPosition& pos, TSqlTranslation& translation) { diff --git a/ydb/library/yql/sql/v1/sql_query.h b/ydb/library/yql/sql/v1/sql_query.h index e97ebd56de2c..97301f96ab8e 100644 --- a/ydb/library/yql/sql/v1/sql_query.h +++ b/ydb/library/yql/sql/v1/sql_query.h @@ -18,6 +18,7 @@ class TSqlQuery: public TSqlTranslation { } TNodePtr Build(const TSQLv1ParserAST& ast); + TNodePtr Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& ast); bool Statement(TVector& blocks, const TRule_sql_stmt_core& core); private: