Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27915][SQL][WIP] Update logical Filter's output nullability based on IsNotNull conditions #24765

Conversation

JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Jun 1, 2019

What changes were proposed in this pull request?

This PR changes the logical Filter operator to update its outputs' nullability when filter conditions imply that outputs cannot be null.

In addition, I refined similar existing logic in the physical FilterExec (changing the existing code to be more precise / less conservative in its non-nullability inference) and improved propagation of inferred nullability information in Project.

This is useful because of how it composes with other optimizations: Spark has several logical and physical optimizations which leverage non-nullability, so improving nullability inference increases the value of those existing optimizations.

⚠️ Disclaimers ⚠️

  • This is a work-in-progress / skunkworks side project; I'm not working on this full time.
  • Nullability has been a major source of bugs in the past: this PR requires careful review.
  • I haven't run analyzer / optimizer performance benchmarks, so there's a decent chance that this WIP changeset regresses query planning performance.
  • DataFrames / Datasets / queries' .schema may change as a result of this optimization: this may have consequences in case nullability information is used by downstream systems (e.g. for CREATE TABLE DDL).
  • The schemas of analyzed and optimized logical plans may now differ in terms of field nullability (because optimization might infer additional constraints which allow us to prove that fields are non-null).

Examples

Consider the query

SELECT key
FROM t
WHERE key IS NOT NULL

where t.key is nullable.

Because of the key IS NOT NULL filter condition, key will always be non-null. Prior to this patch, this query's result schema was overly-conservative, continuing to mark key as nullable. However, if we take advantage of the key IS NOT NULL condition we can set nullable = false for key.

This was a somewhat trivial example, so let's look at some more complicated cases:

Consider

SELECT A.key, A.value
FROM A, B
WHERE
    A.key = B.key AND
    (A.num + B.num) > 0

where all columns of A and B are nullable. Because of the equality join condition, we know that key must be non-null in both tables. In addition, the condition (A.num + B.num) > 0 can only hold if both num values are not null: addition is a null-intolerant operator, meaning that it returns null if any of its operands is null.

Leveraging this, we should be able to mark both key and value as non-null in the join result's schema (even though both values are nullable in the underlying input relation).

Finally, let's look at an example of a non null-intolerant operator: coalesce(a, b) IS NOT NULL could still mean that a or b is null, so in

SELECT key, foo, COALESCE(foo, bar) as qux
FROM A
WHERE COALESCE(foo, bar) > 0

we can infer that qux is not null but cannot make any claims about foo or bar's nullability.

Description of changes

  • Introduce PredicateHelper.getImpliedNotNullExprIds(IsNotNull) helper, which takes an IsNotNull expression and returns the ExprIds of expressions which cannot be null. This handles simple cases like IsNotNull(columnFromTable), as well as more complex cases involving expression trees (properly accounting for null-(in)tolerance).
    • There was similar existing logic in FilterExec, but I think it was overly conservative: given IsNotNull(x), it would claim that x and all of its descendants were not null if and only if every ancestor of x was NullIntolerant. However, even if x is null-tolerant we can still make claims about x's non-nullability even if we can't make further claims about its children.
  • Update logical.Filter to leverage this new function to update output nullability.
  • Modify FilterExec to re-use this logic. This part is a bit tricky because the FilterExec code looks at IsNotNull expressions both for optimizing the order of expression evaluation and for refining nullability to elide null checks in downstream operators.
  • Modify logical.Project so that inferred non-nullability information from child operators is preserved.

Background on related historical changes / bugs

While developing this patch, I found the following historical PRs to be useful references (note: many of these original PRs contained correctness bugs which were subsequently fixed in later PRs):

How was this patch tested?

Added new tests for the added PredicateHelper.getImpliedNotNullExprIds.

TODO: add new end-to-end tests reflecting the examples listed above (in order to properly test the integration of this new logic into logical.Filter and logical.Project).

// However, if g is NOT NullIntolerant (e.g. if g(null) is non-null) then we cannot
// conclude anything about x's nullability.
def getExprIdIfNamed(expr: Expression): Set[ExprId] = expr match {
case ne: NamedExpression => Set(ne.toAttribute.exprId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be AttributeReference? I couldn't remember offhand how to get ExprIds from arbitrary expressions, hence this hack.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use AttributeSet?

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106056 has finished for PR 24765 at commit 05e4bcf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

jenkins retest this please

// Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
// all the variables at the beginning to take advantage of short circuiting.
override def usedInputs: AttributeSet = AttributeSet.empty

// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the old code here to be slightly confusing because it seemed to be using notNullPreds for two different purposes:

  1. If we see IsNotNull conjuncts in the filter then evaluate them first / earlier because (a) these expressions are cheap to evaluate and may allow for short-circuiting and skipping more expensive expressions, and (b) evaluating these earlier allows other expressions to omit null checks (for example, if we have IsNotNull(x) and x * 100 < 10 then we already implicitly need to null-check x as part of the second expression so we might as well do the explicit null check expression first).
  2. Given that tuples have successfully passed through the filter, we can rely on the presence of IsNotNull checks to default subsequent expressions' null checks to false. For example, let's say we had a .filter().select() which gets compiled into a single whole stage codegen: after tuples have passed through the filter we know that certain fields cannot possibly be null, so we can elide null checks at codegen time by just setting nullable = false in subsequent code.

There might be some subtleties related in (1) related to non-deterministic expressions, but I think that's accounted for further down at the place where we're actually generating the checks.

In the old code, the (notNullPreds, otherPreds) on this line was being used for both purposes: for (1) I think we could simply collect all IsNotNull expressions, but the existing implementation of (2) relied on the additional nullIntolerant / a.references checks in order to be correct.

In this PR, I've separated these two usages: the "update nullability for downstream operators" now uses the more precise condition implemented in getImpliedNotNullExprIds, while the "optimize short-circuiting" simply checks for IsNotNull and ignores child attributes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106058 has finished for PR 24765 at commit 1ad4d49.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106057 has finished for PR 24765 at commit 05e4bcf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106059 has finished for PR 24765 at commit a10632f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

This seems to break tests in InferFiltersFromConstraintsSuite because it causes join conditions to differ in their attribute references' nullability. What I think is happening here is that the reference's nullability is determined during analysis, so when we're analyzing correctAnswer we end up recognizing the reference as nullable because the plan is already in its final form (with inferred isNotNull conditions), whereas in the optimized answer those conditions are added after analysis. This change-of-nullability between analysis and optimization ends up breaking the tests. I'm not sure how to fix this.

@JoshRosen
Copy link
Contributor Author

JoshRosen commented Jul 15, 2019

/cc @maropu, who submitted a very similar change ~1 year prior in #21148 (I was unaware of that PR when I created this one).

Chasing down references from that PR, I discovered #23390 and #23508, both of which are concerned with fixing up nullability in attribute references; maybe one of those holds the trick to fixing the blocker identified in my previous comment.

@maropu
Copy link
Member

maropu commented Jul 16, 2019

Yea, thanks for revisiting this, @JoshRosen! I remember we have the two suggestions from @gatorsmile and @cloud-fan in the previous discussion; 1) nullability is just a hint for the optimizer and it might be good to add a new trait for this hint. And, 2) the optimization for Filter.output is not common in use cases and it is more important to fix the same issue in Join.output. So, I'm currently not sure that this is a right approach (I agree to fix this issue though).

val childOutputNullability = child.output.map(a => a.exprId -> a.nullable).toMap
projectList
.map(_.toAttribute)
.map{ a => childOutputNullability.get(a.exprId).map(a.withNullability).getOrElse(a) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to fix this part? It seems UpdateAttributeNullability could handle this case if Filter.output works well?

yiheng pushed a commit to yiheng/spark that referenced this pull request Jul 24, 2019
…ames in PlanTestBase.comparePlans failures

## What changes were proposed in this pull request?
This pr proposes to add a prefix '*' to non-nullable attribute names in PlanTestBase.comparePlans failures. In the current master, nullability mismatches might generate the same error message for left/right logical plans like this;
```
// This failure message was extracted from apache#24765
- constraints should be inferred from aliased literals *** FAILED ***
  == FAIL: Plans do not match ===
  !'Join Inner, (two#0 = a#0)                    'Join Inner, (two#0 = a#0)
   :- Filter (isnotnull(a#0) AND (2 <=> a#0))     :- Filter (isnotnull(a#0) AND (2 <=> a#0))
   :  +- LocalRelation <empty>, [a#0, b#0, c#0]   :  +- LocalRelation <empty>, [a#0, b#0, c#0]
   +- Project [2 AS two#0]                        +- Project [2 AS two#0]
      +- LocalRelation <empty>, [a#0, b#0, c#0]      +- LocalRelation <empty>, [a#0, b#0, c#0] (PlanTest.scala:145)
```
With this pr, this error message is changed to one below;
```
- constraints should be inferred from aliased literals *** FAILED ***
  == FAIL: Plans do not match ===
  !'Join Inner, (*two#0 = a#0)                    'Join Inner, (*two#0 = *a#0)
   :- Filter (isnotnull(a#0) AND (2 <=> a#0))     :- Filter (isnotnull(a#0) AND (2 <=> a#0))
   :  +- LocalRelation <empty>, [a#0, b#0, c#0]   :  +- LocalRelation <empty>, [a#0, b#0, c#0]
   +- Project [2 AS two#0]                        +- Project [2 AS two#0]
      +- LocalRelation <empty>, [a#0, b#0, c#0]      +- LocalRelation <empty>, [a#0, b#0, c#0] (PlanTest.scala:145)
```

## How was this patch tested?
N/A

Closes apache#25213 from maropu/MarkForNullability.

Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 29, 2019
@github-actions github-actions bot closed this Dec 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants