Skip to content

Commit

Permalink
Add 'iterator' to reduce memory consumed by join
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Dec 11, 2014
1 parent 652b781 commit 95d59d6
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

Expand All @@ -493,9 +493,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
pair._1.iterator.map(v => (v, None): (V, Option[W]))
} else {
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
Expand All @@ -510,9 +510,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
pair._2.iterator.map(w => (None, w): (Option[V], W))
} else {
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
Expand All @@ -528,9 +528,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.map(v => (Some(v), None))
case (Seq(), ws) => ws.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None): (Option[V], Option[W]))
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)): (Option[V], Option[W]))
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}

Expand Down

0 comments on commit 95d59d6

Please sign in to comment.