Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17897] [SQL] Fixed IsNotNull Constraint Inference Rule #16067

Closed
wants to merge 6 commits into from

Conversation

gatorsmile
Copy link
Member

@gatorsmile gatorsmile commented Nov 29, 2016

What changes were proposed in this pull request?

The constraints of an operator is the expressions that evaluate to true for all the rows produced. That means, the expression result should be neither false nor unknown (NULL). Thus, we can conclude that IsNotNull on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down IsNotNull to the lowest-level expressions (i.e., Attribute). IsNotNull can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for IsNotNull pushdown.

  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }

IsNotNull itself is not null-intolerant. It converts null to false. If the expression does not include any Not-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the IsNotNull from the inference. After the fix, when a constraint has a IsNotNull expression, we infer new attribute-specific IsNotNull constraints if and only if IsNotNull appears in the root.

Without the fix, the following test case will return empty.

val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()

Before the fix, the optimized plan is like

== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]

After the fix, the optimized plan is like

== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]

How was this patch tested?

Added a test

@gatorsmile gatorsmile changed the title [SPARK-17897] [SQL] Fixed IsNotNull Inference Rule [SPARK-17897] [SQL] Fixed IsNotNull Constraint Inference Rule Nov 29, 2016
@@ -1697,6 +1697,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2"))
}

test("SPARK-17897: Attribute is not NullIntolerant") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New test case name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69350 has finished for PR 16067 at commit 0722ae5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69354 has finished for PR 16067 at commit f693040.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
* of constraints.
*/
private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this infer IsNotNull(a), IsNotNull(b) from IsNotNull(a) && IsNotNull(b)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be split before entering this function. Let me add a test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change simply ignores all IsNotNulls which are not the top expression. The above case works because Filter splits it. But if the constraint looks like Cast(IsNotNull(a), Integer) == 1, we won't infer IsNotNull(a) from it, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. After this PR, we do not support it. This is a pretty rare case, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

resolveColumn(tr, "a") === 1,
IsNotNull(resolveColumn(tr, "c")),
IsNotNull(resolveColumn(tr, "a")),
IsNotNull(resolveColumn(tr, "b")))))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case is added.

@gatorsmile
Copy link
Member Author

gatorsmile commented Nov 30, 2016

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69377 has finished for PR 16067 at commit a835f80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
constraint match {
case IsNotNull(_: Attribute) => constraint :: Nil
Copy link
Contributor

@cloud-fan cloud-fan Nov 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this case, I think it can be covered by the next case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my original idea is to do a fast stop. After rethinking it, it might be fine.

case _ => scanNullIntolerantExpr(constraint).map(IsNotNull(_))
}

/**
* Recursively explores the expressions which are null intolerant and returns all attributes
* in these expressions.
*/
private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename it to scanNullIntolerantAttribute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -1697,6 +1697,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2"))
}

test("SPARK-17897: Fixed IsNotNull Constraint Inference Rule") {
val data = Seq[java.lang.Integer](1, null).toDF("key")
checkAnswer(data.filter("not key is not null"), Row(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use DataFrame API? i.e. data.filter(!$"key".isNotNull). The string version looks weird...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

@cloud-fan
Copy link
Contributor

LGTM except some minor comments

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69388 has started for PR 16067 at commit 54c0dd1.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69396 has finished for PR 16067 at commit 54c0dd1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.1!

asfgit pushed a commit that referenced this pull request Nov 30, 2016
### What changes were proposed in this pull request?
The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

Author: gatorsmile <[email protected]>

Closes #16067 from gatorsmile/isNotNull2.

(cherry picked from commit 2eb093d)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 2eb093d Nov 30, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
### What changes were proposed in this pull request?
The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

Author: gatorsmile <[email protected]>

Closes apache#16067 from gatorsmile/isNotNull2.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
### What changes were proposed in this pull request?
The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

Author: gatorsmile <[email protected]>

Closes apache#16067 from gatorsmile/isNotNull2.
@yhuai
Copy link
Contributor

yhuai commented Feb 10, 2017

@gatorsmile can we also add it in branch-2.0? Thanks!

@gatorsmile
Copy link
Member Author

Sure, let me back port it now.

asfgit pushed a commit that referenced this pull request Feb 12, 2017
… Rule

### What changes were proposed in this pull request?

This PR is to backport #16067 to Spark 2.0

----

The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

Author: Xiao Li <[email protected]>

Closes #16894 from gatorsmile/isNotNull2.0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants