Skip to content

Commit

Permalink
Merge 3055f36 into bd5ea97
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka authored Jan 31, 2024
2 parents bd5ea97 + 3055f36 commit 4800934
Show file tree
Hide file tree
Showing 28 changed files with 762 additions and 84 deletions.
19 changes: 13 additions & 6 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@ namespace NKikimr::NKqp::NPrivateEvents {

struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool canDevideIntoStatements, TInstant deadline,
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false)
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, CanDevideIntoStatements(canDevideIntoStatements)
, Deadline(deadline)
, DbCounters(dbCounters)
, UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, CollectDiagnostics(collectDiagnostics)
, QueryAst(queryAst)
{
Y_ENSURE(Uid.Defined() != Query.Defined());
}
Expand All @@ -37,6 +39,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TMaybe<TString> Uid;
TMaybe<TKqpQueryId> Query;
bool KeepInCache = false;
bool CanDevideIntoStatements = false;
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
Expand All @@ -49,6 +52,8 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

bool CollectDiagnostics = false;

TMaybe<TQueryAst> QueryAst;
};

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
Expand Down Expand Up @@ -99,12 +104,14 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
};

struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvParseResponse> {
TEvParseResponse(const TKqpQueryId& query, TMaybe<TQueryAst> astResult)
: AstResult(std::move(astResult))
, Query(query) {}
TEvParseResponse(const TKqpQueryId& query, TVector<TQueryAst> astStatements, NLWTrace::TOrbit orbit = {})
: AstStatements(std::move(astStatements))
, Query(query)
, Orbit(std::move(orbit)) {}

TMaybe<TQueryAst> AstResult;
TVector<TQueryAst> AstStatements;
TKqpQueryId Query;
NLWTrace::TOrbit Orbit;
};

struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_lwtrace_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ struct TQueryAction {
PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyStatements, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \
TYPES(), \
NAMES()) \
Expand Down
64 changes: 43 additions & 21 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult)
bool canDevideIntoStatements, ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
, FederatedQuerySetup(federatedQuerySetup)
, Uid(uid)
, QueryId(queryId)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes, queryAst)
, UserToken(userToken)
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
Expand All @@ -70,8 +70,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
, CollectFullDiagnostics(collectFullDiagnostics)
, CanDevideIntoStatements(canDevideIntoStatements)
, CompileAction(compileAction)
, AstResult(std::move(astResult))
, QueryAst(std::move(queryAst))
{
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false);

Expand Down Expand Up @@ -127,26 +128,22 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

private:
void SetQueryAst(const TActorContext &ctx) {

TVector<TQueryAst> GetAstStatements(const TActorContext &ctx) {
TString cluster = QueryId.Cluster;
TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef();
ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef();
NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode;
bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources();
bool isEnablePgConstsToParams = Config->EnablePgConstsToParams;
bool perStatement = Config->EnableQueriesPerStatement && CanDevideIntoStatements;

auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams);
YQL_ENSURE(astResult.Ast);
if (astResult.Ast->IsOk()) {
AstResult = std::move(astResult);
}
return SqlToAstStatements(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams, QueryId.IsSql(), perStatement);
}

void StartParsing(const TActorContext &ctx) {
SetQueryAst(ctx);

Become(&TKqpCompileActor::CompileState);
ReplyParseResult(ctx);
ReplyParseResult(ctx, GetAstStatements(ctx));
}

void StartCompilation(const TActorContext &ctx) {
Expand Down Expand Up @@ -352,15 +349,37 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", at state:" << state);
}

void ReplyParseResult(const TActorContext &ctx) {
void ReplyParseResult(const TActorContext &ctx, TVector<TQueryAst> astStatements) {
Y_UNUSED(ctx);

if (astStatements.empty()) {
NYql::TIssue issue(NYql::TPosition(), "Parsing result of query is empty");
ReplyError(Ydb::StatusIds::GENERIC_ERROR, {issue});
}

for (size_t statementId = 0; statementId < astStatements.size(); ++statementId) {
if (!astStatements[statementId].Ast || !astStatements[statementId].Ast->IsOk() || !astStatements[statementId].Ast->Root) {
ALOG_ERROR(NKikimrServices::KQP_COMPILE_ACTOR, "Get parsing result with error"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< ", statement id: " << statementId);

NYql::TIssue issue(NYql::TPosition(), "Internal error while parsing query.");
for (const auto& i : astStatements[statementId].Ast->Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
}

ReplyError(Ydb::StatusIds::GENERIC_ERROR, {issue});
return;
}
}

ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result"
<< ", self: " << SelfId()
<< ", owner: " << Owner
<< (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful"));
<< ", statements size: " << astStatements.size());

auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(AstResult));
AstResult = Nothing();
auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, astStatements);
Send(Owner, responseEv.Release());

Counters->ReportCompileFinish(DbCounters);
Expand Down Expand Up @@ -410,8 +429,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
if (QueryAst) {
KqpCompileResult->Ast = QueryAst->Ast;
}
}

Expand Down Expand Up @@ -476,8 +495,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpTempTablesState::TConstPtr TempTablesState;
bool CollectFullDiagnostics;

bool CanDevideIntoStatements;
ECompileActorAction CompileAction;
TMaybe<TQueryAst> AstResult;
TMaybe<TQueryAst> QueryAst;
};

void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
Expand Down Expand Up @@ -506,6 +526,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode();
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams();
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
kqpConfig.EnableQueriesPerStatement = serviceConfig.GetEnableQueriesPerStatement();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand All @@ -521,14 +542,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool canDevideIntoStatements)
{
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
moduleResolverState, counters,
uid, query, userToken, dbCounters,
federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
compileAction, std::move(astResult));
canDevideIntoStatements, compileAction, std::move(queryAst));
}

} // namespace NKqp
Expand Down
Loading

0 comments on commit 4800934

Please sign in to comment.