Skip to content

Commit

Permalink
More timing logging and fixing the iterator issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 25, 2014
1 parent 0645183 commit a61be7a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object WikiPipelineBenchmark extends Logging {
val startTime = System.currentTimeMillis
logWarning("starting pagerank")
// GRAPH VIEW
val ccStartTime = System.currentTimeMillis
val ccGraph = ConnectedComponents.run(currentGraph).cache
val zeroVal = new JTreeSet[VertexId]()
val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => {
Expand All @@ -89,9 +90,14 @@ object WikiPipelineBenchmark extends Logging {
}
// TABLE VIEW
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size()
val ccEndTime = System.currentTimeMillis
logWarning(s"Connected Components TIMEX: ${(ccEndTime - ccStartTime)/1000.0}")
logWarning(s"Number of connected components for iteration $i: $numCCs")
val prStartTime = System.currentTimeMillis
val pr = PageRank.run(currentGraph, 20).cache
pr.vertices.count
val prEndTime = System.currentTimeMillis
logWarning(s"Pagerank TIMEX: ${(prEndTime - prStartTime)/1000.0}")
logWarning("Pagerank completed")
// TABLE VIEW
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache
Expand All @@ -106,7 +112,11 @@ object WikiPipelineBenchmark extends Logging {
}
val newGraph = currentGraph.subgraph(x => true, filterTop20).cache
newGraph.vertices.count
logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}")
logWarning(s"TOTAL_TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}")
currentGraph.unpersistVertices(blocking = false)
ccGraph.unpersistVertices(blocking = false)
pr.unpersistVertices(blocking = false)
prAndTitle.unpersistVertices(blocking = false)
currentGraph = newGraph
}
currentGraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,28 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr) { // Add src vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.srcIds(i))
i += 1
if (iter.hasNext) {
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr) { // Add src vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.srcIds(i))
i += 1
}
}
}
if (includeDstAttr) { // Add dst vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.dstIds(i))
i += 1
if (includeDstAttr) { // Add dst vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.dstIds(i))
i += 1
}
}
vSet.iterator.map { vid => (vid, pid) }
} else {
Iterator.empty
}
vSet.iterator.map { vid => (vid, pid) }
}

val numPartitions = vertices.partitions.size
Expand Down

0 comments on commit a61be7a

Please sign in to comment.