diff --git a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala index 86e52ce20f5e4..4aa2e793152e0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala @@ -77,6 +77,7 @@ object WikiPipelineBenchmark extends Logging { val startTime = System.currentTimeMillis logWarning("starting pagerank") // GRAPH VIEW + val ccStartTime = System.currentTimeMillis val ccGraph = ConnectedComponents.run(currentGraph).cache val zeroVal = new JTreeSet[VertexId]() val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => { @@ -89,9 +90,14 @@ object WikiPipelineBenchmark extends Logging { } // TABLE VIEW val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size() + val ccEndTime = System.currentTimeMillis + logWarning(s"Connected Components TIMEX: ${(ccEndTime - ccStartTime)/1000.0}") logWarning(s"Number of connected components for iteration $i: $numCCs") + val prStartTime = System.currentTimeMillis val pr = PageRank.run(currentGraph, 20).cache pr.vertices.count + val prEndTime = System.currentTimeMillis + logWarning(s"Pagerank TIMEX: ${(prEndTime - prStartTime)/1000.0}") logWarning("Pagerank completed") // TABLE VIEW val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache @@ -106,7 +112,11 @@ object WikiPipelineBenchmark extends Logging { } val newGraph = currentGraph.subgraph(x => true, filterTop20).cache newGraph.vertices.count - logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") + logWarning(s"TOTAL_TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") + currentGraph.unpersistVertices(blocking = false) + ccGraph.unpersistVertices(blocking = false) + pr.unpersistVertices(blocking = false) + prAndTitle.unpersistVertices(blocking = false) currentGraph = newGraph } currentGraph diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index fe44e1ee0c391..35245971912d7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -49,24 +49,28 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrcAttr) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 + if (iter.hasNext) { + val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrcAttr) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } } - } - if (includeDstAttr) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 + if (includeDstAttr) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } } + vSet.iterator.map { vid => (vid, pid) } + } else { + Iterator.empty } - vSet.iterator.map { vid => (vid, pid) } } val numPartitions = vertices.partitions.size