Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19851][SQL] Add support for EVERY and ANY (SOME) aggregates #22809

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ object FunctionRegistry {
expression[CollectList]("collect_list"),
expression[CollectSet]("collect_set"),
expression[CountMinSketchAgg]("count_min_sketch"),
expression[EveryAgg]("every"),
expression[AnyAgg]("any"),
expression[SomeAgg]("some"),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded newline


// string functions
expression[Ascii]("ascii"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down Expand Up @@ -282,6 +283,31 @@ trait RuntimeReplaceable extends UnaryExpression with Unevaluable {
override lazy val canonicalized: Expression = child.canonicalized
}

/**
* An aggregate expression that gets rewritten (currently by the optimizer) into a
* different aggregate expression for evaluation. This is mainly used to provide compatibility
* with other databases. For example, we use this to support every, any/some aggregates by rewriting
* them with Min and Max respectively.
*/
trait UnevaluableAggrgate extends DeclarativeAggregate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: UnevaluableAggrgate -> UnevaluableAggregate


override def nullable: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set them always as nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 most of the aggregates are nullable, no ? Did you have an suggestion here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be nullable only if the incoming expression is?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I think for aggregates, its different ? Please see Max, Min, they all define it to be nullable. I think they work on group of rows and can return null on empty input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, I was missing that case, sorry, thanks.


override lazy val aggBufferAttributes =
throw new UnsupportedOperationException(s"Cannot evaluate aggBufferAttributes: $this")

override lazy val initialValues: Seq[Expression] =
throw new UnsupportedOperationException(s"Cannot evaluate initialValues: $this")

override lazy val updateExpressions: Seq[Expression] =
throw new UnsupportedOperationException(s"Cannot evaluate updateExpressions: $this")

override lazy val mergeExpressions: Seq[Expression] =
throw new UnsupportedOperationException(s"Cannot evaluate mergeExpressions: $this")

override lazy val evaluateExpression: Expression =
throw new UnsupportedOperationException(s"Cannot evaluate evaluateExpression: $this")
}

/**
* Expressions that don't have SQL representation should extend this trait. Examples are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate {

override lazy val evaluateExpression: AttributeReference = max
}

abstract class AnyAggBase(arg: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change this to something like UnevaluableBooleanAggBase and make also EveryAgg extend this, in order to avoid code duplication?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I had a confusion on where to house this class ? Thats why i kept it separate :-) . Is it okay if i just rename it and keep it in Max.scala ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can move it close to UnevaluableAggrgate. @cloud-fan @dilipbiswal WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's hard to decide where to put it, I think putting it in a new file can be considered.

extends UnevaluableAggrgate with ImplicitCastInputTypes {

override def children: Seq[Expression] = arg :: Nil

override def dataType: DataType = BooleanType

override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType)

override def checkInputDataTypes(): TypeCheckResult = {
arg.dataType match {
case dt if dt != BooleanType =>
TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " +
s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].")
case _ => TypeCheckResult.TypeCheckSuccess
}
}
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add since = "3.0.0"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add. One observation :

I see that, only in the API's we specify the @SInCE .. for example for aggregate Max, we have it in api function.scala:Max .. These aggregates are not exposed in the dataset apis and none of the other aggregates seem to have it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point is that it was not available until 2.3, so earlier methods don't have it. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Yeah... If we look at the definition of other aggregate function like Max, Min etc, they don't seem to have the @Since. However, they are defined in the for functions.scala for max and min as @Since 1.3. So basically i was not sure on the what is the rule when a function is not exposed in dataset api but only from SQL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, here I am not talking about @Since, I am talking about since as parameter of @ExpressionDescription

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we need Since here. Some functions don't have them because at that time the Since method was not there. We should add missing Since to them as well, if other people have time to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 @cloud-fan Thanks .. will add.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw, let's add since at ExpressionDescription wherever possible .. )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Sure.. I will open a pr shortly.

case class AnyAgg(arg: Expression) extends AnyAggBase(arg) {
override def nodeName: String = "Any"
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case class SomeAgg(arg: Expression) extends AnyAggBase(arg) {
override def nodeName: String = "Some"
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,27 @@ case class Min(child: Expression) extends DeclarativeAggregate {

override lazy val evaluateExpression: AttributeReference = min
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Returns true if all values of `expr` are true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case class EveryAgg(arg: Expression)
extends UnevaluableAggrgate with ImplicitCastInputTypes {

override def nodeName: String = "Every"

override def children: Seq[Expression] = arg :: Nil

override def dataType: DataType = BooleanType

override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType)

override def checkInputDataTypes(): TypeCheckResult = {
arg.dataType match {
case dt if dt != BooleanType =>
TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " +
s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].")
case _ => TypeCheckResult.TypeCheckSuccess
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@ import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._


/**
* Finds all [[RuntimeReplaceable]] expressions and replace them with the expressions that can
* be evaluated. This is mainly used to provide compatibility with other databases.
* For example, we use this to support "nvl" by replacing it with "coalesce".
* Finds all the expressions that are unevaluable and replace/rewrite them with semantically
* equivalent expressions that can be evaluated. Currently we replace two kinds of expressions :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before :

* 1) [[RuntimeReplaceable]] expressions
* 2) [[UnevaluableAggrgate]] expressions such as Every, Some, Any
* This is mainly used to provide compatibility with other databases.
* Few examples are :
* we use this to support "nvl" by replacing it with "coalesce".
* we use this to replace Every and Any with Min and Max respectively.
*/
object ReplaceExpressions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case e: RuntimeReplaceable => e.child
case SomeAgg(arg) => Max(arg)
case AnyAgg(arg) => Max(arg)
case EveryAgg(arg) => Min(arg)
}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneded change

/**
* Computes the current date and time to make sure we return the same result in a single query.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
assertSuccess(Sum('stringField))
assertSuccess(Average('stringField))
assertSuccess(Min('arrayField))
assertSuccess(new EveryAgg('booleanField))
assertSuccess(new AnyAgg('booleanField))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add also SomeAgg?


assertError(Min('mapField), "min does not support ordering on type")
assertError(Max('mapField), "max does not support ordering on type")
Expand Down
66 changes: 66 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,69 @@ SELECT 1 FROM range(10) HAVING true;
SELECT 1 FROM range(10) HAVING MAX(id) > 0;

SELECT id FROM range(10) HAVING id > 0;

-- Test data
CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES
(1, true), (1, false),
(2, true),
(3, false), (3, null),
(4, null), (4, null),
(5, null), (5, true), (5, false) AS test_agg(k, v);

-- empty table
SELECT every(v), some(v), any(v) FROM test_agg WHERE 1 = 0;

-- all null values
SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 4;

-- aggregates are null Filtering
SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 5;

-- group by
SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k;

-- having
SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) = false;
SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) IS NULL;

-- basic subquery path to make sure rewrite happens in both parent and child plans.
SELECT k,
Every(v) AS every
FROM test_agg
WHERE k = 2
AND v IN (SELECT Any(v)
FROM test_agg
WHERE k = 1)
GROUP BY k;

-- basic subquery path to make sure rewrite happens in both parent and child plans.
SELECT k,
Every(v) AS every
FROM test_agg
WHERE k = 2
AND v IN (SELECT Every(v)
FROM test_agg
WHERE k = 1)
GROUP BY k;

-- input type checking Int
SELECT every(1);

-- input type checking Short
SELECT some(1S);

-- input type checking Long
SELECT any(1L);

-- input type checking String
SELECT every("true");

-- every/some/any aggregates are supported as windows expression.
SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan here are a few window tests. (fyi)

SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;
SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg;

-- simple explain of queries having every/some/any agregates. Optimized
-- plan should show the rewritten aggregate expression.
EXPLAIN EXTENDED SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k;

Loading