Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Apr 3, 2024
1 parent 8053e58 commit ad61b0c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
public interface ReducibleFunction<I, O> {

/**
* This method is for parameterized functions.
* This method is for the bucket function.
*
* If this parameterized function is 'reducible' on another bucket function,
* If this bucket function is 'reducible' on another bucket function,
* return the {@link Reducer} function.
* <p>
* Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
* For example, to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
* <ul>
* <li>thisBucketFunction = bucket</li>
* <li>thisNumBuckets = 4</li>
Expand All @@ -79,10 +79,10 @@ public interface ReducibleFunction<I, O> {
* @param otherNumBuckets parameter for the other function
* @return a reduction function if it is reducible, null if not
*/
default Reducer<I, O> bucketReducer(
int thisNumBuckets,
ReducibleFunction<?, ?> otherBucketFunction,
int otherNumBuckets) {
default Reducer<I, O> reducer(
int thisNumBuckets,
ReducibleFunction<?, ?> otherBucketFunction,
int otherNumBuckets) {
throw new UnsupportedOperationException();
}

Expand All @@ -100,7 +100,7 @@ default Reducer<I, O> bucketReducer(
* @param otherFunction the other function
* @return a reduction function if it is reducible, null if not.
*/
default Reducer<I, O> bucketReducer(ReducibleFunction<?, ?> otherFunction) {
default Reducer<I, O> reducer(ReducibleFunction<?, ?> otherFunction) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ case class TransformExpression(
}

// Return a Reducer for a reducible function on another reducible function
private def reducer(thisFunction: ReducibleFunction[_, _],
thisNumBucketsOpt: Option[Int],
otherFunction: ReducibleFunction[_, _],
otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
private def reducer(
thisFunction: ReducibleFunction[_, _],
thisNumBucketsOpt: Option[Int],
otherFunction: ReducibleFunction[_, _],
otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
case (Some(numBuckets), Some(otherNumBuckets)) =>
thisFunction.bucketReducer(numBuckets, otherFunction, otherNumBuckets)
case _ => thisFunction.bucketReducer(otherFunction)
thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
case _ => thisFunction.reducer(otherFunction)
}
Option(res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,9 @@ case class KeyGroupedShuffleSpec(

object KeyGroupedShuffleSpec {
def reducePartitionValue(
row: InternalRow,
expressions: Seq[Expression],
reducers: Seq[Option[Reducer[_, _]]]):
row: InternalRow,
expressions: Seq[Expression],
reducers: Seq[Option[Reducer[_, _]]]):
InternalRowComparableWrapper = {
val partitionVals = row.toSeq(expressions.map(_.dataType))
val reducedRow = partitionVals.zip(reducers).map{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ case class EnsureRequirements(
}

private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
expressions: Seq[Expression],
reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
expressions: Seq[Expression],
reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
reducers match {
case Some(reducers) => commonPartValues.groupBy { case (row, _) =>
KeyGroupedShuffleSpec.reducePartitionValue(row, expressions, reducers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
(input.getLong(1) % input.getInt(0)).toInt
}

override def bucketReducer(
thisNumBuckets: Int,
otherFunc: ReducibleFunction[_, _],
otherNumBuckets: Int): Reducer[Int, Int] = {
override def reducer(
thisNumBuckets: Int,
otherFunc: ReducibleFunction[_, _],
otherNumBuckets: Int): Reducer[Int, Int] = {

if (otherFunc == BucketFunction) {
if ((thisNumBuckets > otherNumBuckets)
Expand Down

0 comments on commit ad61b0c

Please sign in to comment.