Skip to content

Commit

Permalink
[dq] Fallback selfjoin without spilling (#5453)
Browse files Browse the repository at this point in the history
  • Loading branch information
rvu1024 authored Jun 11, 2024
1 parent f2d1054 commit 32c0c65
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
16 changes: 10 additions & 6 deletions ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ class TDqExecutionValidator {
}


bool hasJoin = false;
bool hasMapJoin = false;
VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
[](const TExprNode::TPtr& n) {
return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
},
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasJoin, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
if (TDqPhyMapJoin::Match(n.Get())) {
hasMapJoin = true;
hasJoin = hasMapJoin = true;
} else if (TCoGraceJoinCore::Match(n.Get()) || TCoGraceSelfJoinCore::Match(n.Get())) {
hasJoin = true;
}

if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
hasErrors = true;
Expand All @@ -69,7 +73,7 @@ class TDqExecutionValidator {
);

HasMapJoin_ |= hasMapJoin;
if (hasMapJoin && CheckSelfMapJoin_) {
if (hasJoin && CheckSelfJoin_) {
TNodeSet unitedVisitedStages;
bool nonUniqStages = false;
for (auto n: TDqStageBase(&node).Inputs()) {
Expand All @@ -80,7 +84,7 @@ class TDqExecutionValidator {
nonUniqStages |= (expectedSize != unitedVisitedStages.size()); // Found duplicates - some stage was visited twice from different inputs
}
if (nonUniqStages) {
ReportError(Ctx_, node, TStringBuilder() << "Cannot execute self join using mapjoin strategy in DQ");
ReportError(Ctx_, node, TStringBuilder() << "Cannot execute self join in DQ");
hasErrors = true;
}
if (visitedStages) {
Expand Down Expand Up @@ -141,7 +145,7 @@ class TDqExecutionValidator {
: TypeCtx_(typeCtx)
, Ctx_(ctx)
, State_(state)
, CheckSelfMapJoin_(!TypeCtx_.ForceDq
, CheckSelfJoin_(!TypeCtx_.ForceDq
&& !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate)
&& !State_->Settings->IsSpillingEnabled())
{}
Expand Down Expand Up @@ -205,7 +209,7 @@ class TDqExecutionValidator {
const TTypeAnnotationContext& TypeCtx_;
TExprContext& Ctx_;
const TDqState::TPtr State_;
const bool CheckSelfMapJoin_;
const bool CheckSelfJoin_;

TNodeSet Visited_;
THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

<tmp_path>/program.sql:<main>: Info: Optimization

<tmp_path>/program.sql:<main>:7:22: Info: Cannot execute self join using mapjoin strategy in DQ
<tmp_path>/program.sql:<main>:7:22: Info: Cannot execute self join in DQ
select * from $in as a inner join $in as b on a.key = b.key;
^

0 comments on commit 32c0c65

Please sign in to comment.