Skip to content

Commit

Permalink
[SPARK-26757][GRAPHX] Return 0 for count on empty Edge/Vertex RDDs
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Previously a "java.lang.UnsupportedOperationException: empty
collection" exception would be thrown due to using `reduce`, rather
than `fold` or similar that can tolerate empty RDDs.

This behaviour has existed for the Vertex RDDs since it was introduced
in b30e0ae. It seems this behaviour
was inherited by the Edge RDDs via copy-paste in
ee29ef3.

## How was this patch tested?

Two new unit tests.

Closes apache#23681 from huonw/empty-graphx.

Authored-by: Huon Wilson <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
huonw authored and srowen committed Jan 31, 2019
1 parent 2514163 commit da52698
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (

/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
}

override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (

/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size.toLong).reduce(_ + _)
partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
}

override private[graphx] def mapVertexPartitions[VD2: ClassTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object SVDPlusPlus {

// calculate global rating mean
edges.cache()
val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2))
val u = rs / rc

// construct graph
Expand Down
10 changes: 10 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("count") {
withSpark { sc =>
val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
assert(empty.count === 0)

val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
assert(nonempty.count === edges.size)
}
}
}
11 changes: 11 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}

test("count") {
withSpark { sc =>
val empty = VertexRDD(sc.emptyRDD[(Long, Unit)])
assert(empty.count === 0)

val n = 100
val nonempty = vertices(sc, n)
assert(nonempty.count === n + 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Test SVD++ with no edges") {
withSpark { sc =>
val edges = sc.emptyRDD[Edge[Double]]
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
val (graph, _) = SVDPlusPlus.run(edges, conf)
assert(graph.vertices.count == 0)
assert(graph.edges.count == 0)
}
}
}

0 comments on commit da52698

Please sign in to comment.