diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java
index 86156c7896304..ef1a14e50cdad 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java
@@ -61,12 +61,12 @@
public interface ReducibleFunction {
/**
- * This method is for parameterized functions.
+ * This method is for the bucket function.
*
- * If this parameterized function is 'reducible' on another bucket function,
+ * If this bucket function is 'reducible' on another bucket function,
* return the {@link Reducer} function.
*
- * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+ * For example, to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
*
* - thisBucketFunction = bucket
* - thisNumBuckets = 4
@@ -79,10 +79,10 @@ public interface ReducibleFunction {
* @param otherNumBuckets parameter for the other function
* @return a reduction function if it is reducible, null if not
*/
- default Reducer bucketReducer(
- int thisNumBuckets,
- ReducibleFunction, ?> otherBucketFunction,
- int otherNumBuckets) {
+ default Reducer reducer(
+ int thisNumBuckets,
+ ReducibleFunction, ?> otherBucketFunction,
+ int otherNumBuckets) {
throw new UnsupportedOperationException();
}
@@ -100,7 +100,7 @@ default Reducer bucketReducer(
* @param otherFunction the other function
* @return a reduction function if it is reducible, null if not.
*/
- default Reducer bucketReducer(ReducibleFunction, ?> otherFunction) {
+ default Reducer reducer(ReducibleFunction, ?> otherFunction) {
throw new UnsupportedOperationException();
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala
index 371e6622d5a6b..ed44fdd838fa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala
@@ -102,8 +102,8 @@ case class TransformExpression(
otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
case (Some(numBuckets), Some(otherNumBuckets)) =>
- thisFunction.bucketReducer(numBuckets, otherFunction, otherNumBuckets)
- case _ => thisFunction.bucketReducer(otherFunction)
+ thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
+ case _ => thisFunction.reducer(otherFunction)
}
Option(res)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 86295ec615757..2364130f79e4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -881,9 +881,9 @@ case class KeyGroupedShuffleSpec(
object KeyGroupedShuffleSpec {
def reducePartitionValue(
- row: InternalRow,
- expressions: Seq[Expression],
- reducers: Seq[Option[Reducer[_, _]]]):
+ row: InternalRow,
+ expressions: Seq[Expression],
+ reducers: Seq[Option[Reducer[_, _]]]):
InternalRowComparableWrapper = {
val partitionVals = row.toSeq(expressions.map(_.dataType))
val reducedRow = partitionVals.zip(reducers).map{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 7ff682178ad27..105bded78549c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -569,8 +569,8 @@ case class EnsureRequirements(
}
private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
- expressions: Seq[Expression],
- reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
+ expressions: Seq[Expression],
+ reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
reducers match {
case Some(reducers) => commonPartValues.groupBy { case (row, _) =>
KeyGroupedShuffleSpec.reducePartitionValue(row, expressions, reducers)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
index 68f5e774a385f..c4207fd3e0920 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
@@ -86,10 +86,10 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
(input.getLong(1) % input.getInt(0)).toInt
}
- override def bucketReducer(
- thisNumBuckets: Int,
- otherFunc: ReducibleFunction[_, _],
- otherNumBuckets: Int): Reducer[Int, Int] = {
+ override def reducer(
+ thisNumBuckets: Int,
+ otherFunc: ReducibleFunction[_, _],
+ otherNumBuckets: Int): Reducer[Int, Int] = {
if (otherFunc == BucketFunction) {
if ((thisNumBuckets > otherNumBuckets)