Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20543: ddl+dml in query service #1444

Merged
merged 13 commits into from
Feb 26, 2024
19 changes: 13 additions & 6 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,24 @@ namespace NKikimr::NKqp::NPrivateEvents {

struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, TInstant deadline,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false)
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, DbCounters(dbCounters)
, UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, CollectDiagnostics(collectDiagnostics)
, QueryAst(queryAst)
VPolka marked this conversation as resolved.
Show resolved Hide resolved
{
Y_ENSURE(Uid.Defined() != Query.Defined());
}
Expand All @@ -39,6 +41,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TMaybe<TKqpQueryId> Query;
bool KeepInCache = false;
bool IsQueryActionPrepare = false;
bool PerStatementResult = false;
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
Expand All @@ -51,6 +54,8 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

bool CollectDiagnostics = false;

TMaybe<TQueryAst> QueryAst;
};

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
Expand Down Expand Up @@ -103,12 +108,14 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
};

struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvParseResponse> {
TEvParseResponse(const TKqpQueryId& query, TMaybe<TQueryAst> astResult)
: AstResult(std::move(astResult))
, Query(query) {}
TEvParseResponse(const TKqpQueryId& query, TVector<TQueryAst> astStatements, NLWTrace::TOrbit orbit = {})
: AstStatements(std::move(astStatements))
, Query(query)
, Orbit(std::move(orbit)) {}

TMaybe<TQueryAst> AstResult;
TVector<TQueryAst> AstStatements;
TKqpQueryId Query;
NLWTrace::TOrbit Orbit;
};

struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_lwtrace_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ struct TQueryAction {
PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyStatements, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
Expand Down
67 changes: 45 additions & 22 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult)
bool perStatementResult, ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
, FederatedQuerySetup(federatedQuerySetup)
, Uid(uid)
, QueryId(queryId)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst)
, UserToken(userToken)
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
Expand All @@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
, CollectFullDiagnostics(collectFullDiagnostics)
, PerStatementResult(perStatementResult)
, CompileAction(compileAction)
, AstResult(std::move(astResult))
, QueryAst(std::move(queryAst))
{
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false);

Expand Down Expand Up @@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

private:
void SetQueryAst(const TActorContext &ctx) {
TString cluster = QueryId.Cluster;

TVector<TQueryAst> GetAstStatements(const TActorContext &ctx) {
TString cluster = QueryId.Cluster;
TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef();
ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef();
NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode;
bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources();
bool isEnablePgConstsToParams = Config->EnablePgConstsToParams;
bool perStatementExecution = Config->EnablePerStatementQueryExecution && PerStatementResult;

auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams);
YQL_ENSURE(astResult.Ast);
if (astResult.Ast->IsOk()) {
AstResult = std::move(astResult);
}
return ParseStatements(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql(), perStatementExecution);
}

void StartParsing(const TActorContext &ctx) {
SetQueryAst(ctx);

Become(&TKqpCompileActor::CompileState);
ReplyParseResult(ctx);
ReplyParseResult(ctx, GetAstStatements(ctx));
}

void StartCompilation(const TActorContext &ctx) {
Expand Down Expand Up @@ -352,15 +349,38 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", at state:" << state);
}

void ReplyParseResult(const TActorContext &ctx) {
void ReplyParseResult(const TActorContext &ctx, TVector<TQueryAst>&& astStatements) {
Y_UNUSED(ctx);

if (astStatements.empty()) {
NYql::TIssue issue(NYql::TPosition(), "Parsing result of query is empty");
ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
return;
}

for (size_t statementId = 0; statementId < astStatements.size(); ++statementId) {
if (!astStatements[statementId].Ast || !astStatements[statementId].Ast->IsOk() || !astStatements[statementId].Ast->Root) {
ALOG_ERROR(NKikimrServices::KQP_COMPILE_ACTOR, "Get parsing result with error"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< ", statement id: " << statementId);

NYql::TIssue issue(NYql::TPosition(), "Error while parsing query.");
for (const auto& i : astStatements[statementId].Ast->Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
}

ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue});
return;
}
}

ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful"));
<< ", statements size: " << astStatements.size());

auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(AstResult));
AstResult = Nothing();
auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(astStatements));
Send(Owner, responseEv.Release());

Counters->ReportCompileFinish(DbCounters);
Expand All @@ -379,8 +399,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
if (QueryAst) {
KqpCompileResult->Ast = QueryAst->Ast;
}
}

Expand Down Expand Up @@ -482,8 +502,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpTempTablesState::TConstPtr TempTablesState;
bool CollectFullDiagnostics;

const bool PerStatementResult;
ECompileActorAction CompileAction;
TMaybe<TQueryAst> AstResult;
TMaybe<TQueryAst> QueryAst;
};

void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
Expand Down Expand Up @@ -512,6 +533,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode();
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams() && serviceConfig.GetEnableAstCache();
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
kqpConfig.EnablePerStatementQueryExecution = serviceConfig.GetEnablePerStatementQueryExecution();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand All @@ -527,14 +549,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool perStatementResult)
{
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
moduleResolverState, counters,
uid, query, userToken, dbCounters,
federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
compileAction, std::move(astResult));
perStatementResult, compileAction, std::move(queryAst));
}

} // namespace NKqp
Expand Down
Loading
Loading