Skip to content

Commit

Permalink
fix a racing condition in zipWithIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Nov 16, 2014
1 parent 825709a commit c284d9f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
31 changes: 17 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
private[spark]
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {

override def getPartitions: Array[Partition] = {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
val startIndices: Array[Long] =
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Array(0L)
} else {
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1, // do not need to count the last partition
false
).scanLeft(0L)(_ + _)
}
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Array(0L)
} else {
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1, // do not need to count the last partition
allowLocal = false
).scanLeft(0L)(_ + _)
}
}

override def getPartitions: Array[Partition] = {
firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
}

Expand Down
5 changes: 5 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}

test("zipWithIndex chained with other RDDs (SPARK-4433)") {
val count = sc.parallelize(0 until 10, 2).zipWithIndex().repartition(4).count()
assert(count === 10)
}

test("zipWithUniqueId") {
val n = 10
val data = sc.parallelize(0 until n, 3)
Expand Down

0 comments on commit c284d9f

Please sign in to comment.