Skip to content

Commit

Permalink
Fix compiler warning
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Feb 28, 2024
1 parent 8d511ac commit dc2fba1
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ public interface ReducibleFunction<T, A> extends ScalarFunction<T> {
* @param otherArgument argument for other function instance
* @return a reduction function if it is reducible, none if not
*/
Option<Reducer<A>> reducer(ReducibleFunction<?, ?> other, Option<?> thisArgument, Option<?> otherArgument);
Option<Reducer<A>> reducer(ReducibleFunction<?, ?> other, Option<?> thisArgument,
Option<?> otherArgument);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ case class TransformExpression(
true
} else {
(function, other.function) match {
case (f: ReducibleFunction[Any, Any] @unchecked,
o: ReducibleFunction[Any, Any] @unchecked) =>
case (f: ReducibleFunction[_, _], o: ReducibleFunction[_, _]) =>
val reducer = f.reducer(o, numBucketsOpt, other.numBucketsOpt)
val otherReducer = o.reducer(f, other.numBucketsOpt, numBucketsOpt)
reducer.isDefined || otherReducer.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ trait ShuffleSpec {
* Returning none also indicates that none of the partition expressions can be reduced on the
* corresponding expression on the other shuffle spec.
*/
def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[Any]]]] = None
def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None
}

case object SinglePartitionShuffleSpec extends ShuffleSpec {
Expand Down Expand Up @@ -854,17 +854,16 @@ case class KeyGroupedShuffleSpec(
KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
}

override def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[Any]]]] = {
override def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = {
other match {
case otherSpec: KeyGroupedShuffleSpec =>
val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
case (e1: TransformExpression, e2: TransformExpression)
if e1.function.isInstanceOf[ReducibleFunction[Any, Any]@unchecked]
&& e2.function.isInstanceOf[ReducibleFunction[Any, Any]@unchecked] =>
e1.function.asInstanceOf[ReducibleFunction[Any, Any]].reducer(
e2.function.asInstanceOf[ReducibleFunction[Any, Any]],
e1.numBucketsOpt.map(a => a.asInstanceOf[Any]),
e2.numBucketsOpt.map(a => a.asInstanceOf[Any]))
if e1.function.isInstanceOf[ReducibleFunction[_, _]]
&& e2.function.isInstanceOf[ReducibleFunction[_, _]] =>
e1.function.asInstanceOf[ReducibleFunction[_, _]].reducer(
e2.function.asInstanceOf[ReducibleFunction[_, _]],
e1.numBucketsOpt, e2.numBucketsOpt)
case (_, _) => None
}

Expand Down Expand Up @@ -892,11 +891,11 @@ case class KeyGroupedShuffleSpec(
object KeyGroupedShuffleSpec {
def reducePartitionValue(row: InternalRow,
expressions: Seq[Expression],
reducers: Seq[Option[Reducer[Any]]]):
reducers: Seq[Option[Reducer[_]]]):
InternalRowComparableWrapper = {
val partitionVals = row.toSeq(expressions.map(_.dataType))
val reducedRow = partitionVals.zip(reducers).map{
case (v, Some(reducer)) => reducer.reduce(v)
case (v, Some(reducer: Reducer[Any])) => reducer.reduce(v)
case (v, _) => v
}.toArray
InternalRowComparableWrapper(new GenericInternalRow(reducedRow), expressions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ case class StoragePartitionJoinParams(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
joinKeyPositions: Option[Seq[Int]] = None,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
reducers: Option[Seq[Option[Reducer[Any]]]] = None,
reducers: Option[Seq[Option[Reducer[_]]]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false) {
override def equals(other: Any): Boolean = other match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ case class EnsureRequirements(
plan: SparkPlan,
values: Seq[(InternalRow, Int)],
joinKeyPositions: Option[Seq[Int]],
reducers: Option[Seq[Option[Reducer[Any]]]],
reducers: Option[Seq[Option[Reducer[_]]]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean): SparkPlan = plan match {
case scan: BatchScanExec =>
Expand All @@ -570,7 +570,7 @@ case class EnsureRequirements(

private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
expressions: Seq[Expression],
reducers: Option[Seq[Option[Reducer[Any]]]]) = {
reducers: Option[Seq[Option[Reducer[_]]]]) = {
reducers match {
case Some(reducers) => commonPartValues.groupBy { case (row, _) =>
KeyGroupedShuffleSpec.reducePartitionValue(row, expressions, reducers)
Expand Down

0 comments on commit dc2fba1

Please sign in to comment.