From b30e0ae0351be1cbc0b1cf179293587b466ee026 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 30 Nov 2013 14:24:18 -0800 Subject: [PATCH] Added an optimized count to VertexSetRDD. --- graph/src/main/scala/org/apache/spark/graph/Pregel.scala | 2 -- .../src/main/scala/org/apache/spark/graph/VertexSetRDD.scala | 5 +++++ .../scala/org/apache/spark/graph/impl/VertexPartition.scala | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 29d6225f33838..55b3464b56989 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -139,8 +139,6 @@ object Pregel { * @param initialMsg the message each vertex will receive at the on * the first iteration. * - * @param numIter the number of iterations to run this computation. - * * @param vprog the user-defined vertex program which runs on each * vertex and receives the inbound message and computes a new vertex * value. On the first iteration the vertex program is invoked on diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 328dafa632108..4c8128f3e0f1d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -101,6 +101,11 @@ class VertexSetRDD[@specialized VD: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): VertexSetRDD[VD] = persist() + /** Return the number of vertices in this set. */ + override def count(): Long = { + partitionsRDD.map(_.size).reduce(_ + _) + } + /** * Provide the `RDD[(Vid, VD)]` equivalent output. */ diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index f5047e7b9e851..3d80ab1bb9313 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -42,6 +42,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( val capacity: Int = index.capacity + def size: Int = mask.cardinality + /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index.