Skip to content

Commit

Permalink
[SPARK-12031][CORE][BUG] Integer overflow when do sampling
Browse files Browse the repository at this point in the history
Author: uncleGen <[email protected]>

Closes apache#10023 from uncleGen/1.6-bugfix.
  • Loading branch information
uncleGen authored and srowen committed Dec 9, 2015
1 parent f6883bb commit a113216
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private[spark] object RangePartitioner {
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
Expand All @@ -262,7 +262,7 @@ private[spark] object RangePartitioner {
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] object SamplingUtils {
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Int) = {
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
Expand All @@ -52,16 +52,17 @@ private[spark] object SamplingUtils {
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
val replacementIndex = rand.nextInt(i)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex) = item
reservoir(replacementIndex.toInt) = item
}
i += 1
l += 1
}
(reservoir, i)
(reservoir, l)
}
}

Expand Down

0 comments on commit a113216

Please sign in to comment.