Skip to content

Commit

Permalink
[SPARK-45663][CORE][MLLIB] Replace IterableOnceOps#aggregate with `…
Browse files Browse the repository at this point in the history
…IterableOnceOps#foldLeft`

### What changes were proposed in this pull request?
This pr replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` due to `aggregate` has been marked as deprecated since Scala 2.13.0.

```scala
  deprecated("`aggregate` is not relevant for sequential collections. Use `foldLeft(z)(seqop)` instead.", "2.13.0")
  def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)
```

### Why are the changes needed?
Clean up deprecated API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#43527 from LuciferYang/SPARK-45663.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
  • Loading branch information
LuciferYang committed Oct 26, 2023
1 parent 35c628d commit a3146c8
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 7 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1219,8 +1219,7 @@ abstract class RDD[T: ClassTag](
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val aggregatePartition = (it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
Expand Down Expand Up @@ -1258,7 +1257,7 @@ abstract class RDD[T: ClassTag](
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
(it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] object StratifiedSamplingUtils extends Logging {
val rng = new RandomDataGenerator()
rng.reSeed(seed + partition)
val seqOp = getSeqOp(withReplacement, fractions, rng, counts)
Iterator(iter.aggregate(zeroU)(seqOp, combOp))
Iterator(iter.foldLeft(zeroU)(seqOp))
}
mappedPartitionRDD.reduce(combOp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ private[evaluation] object AreaUnderCurve {
* @param curve an iterator over ordered 2D points stored in pairs representing a curve
*/
def of(curve: Iterable[(Double, Double)]): Double = {
curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points),
combop = _ + _
curve.iterator.sliding(2).withPartial(false).foldLeft(0.0)(
op = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points)
)
}
}

0 comments on commit a3146c8

Please sign in to comment.