Skip to content

Commit

Permalink
Updated pipeline benchmark to handle other systems.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 24, 2014
1 parent 0a04cb9 commit e6fc93b
Showing 1 changed file with 87 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}

object WikiPipelineBenchmark extends Logging {


def main(args: Array[String]) = {

val host = args(0)
Expand All @@ -24,31 +23,29 @@ object WikiPipelineBenchmark extends Logging {
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")

val sc = new SparkContext(host, "ETL", sparkconf)
val sc = new SparkContext(host, "WikiPipeline", sparkconf)

val start = System.currentTimeMillis
process match {
case "pre" => {
val fname = args(2)
val outbase = args(3)
preProcess(sc, fname, outbase)
}
case "post" => {
val rankPath = args(2)
val attributePath = args(3)
val result = postProcess(sc, rankPath, attributePath)
logWarning(result)
}
case "graphx" => {
val rawData = args(2)
val numIters = args(3).toInt
val result = benchmarkGraphx(sc, rawData, numIters)
// logWarning(result)
logWarning(result)
}
case "prep" => {

case "extract" => {
val rawData = args(2)
val outBase = args(3)
prep(sc, rawData, outBase)
val (vertices, edges) = extractLinkGraph(sc, rawData)
writeGraphAsText(outBase, vertices, edges, 0)
}

case "analyze" => {
val outBase = args(2)
val iter = args(3).toInt
pipelinePostProcessing(sc, outBase, iter)

}

case _ => throw new IllegalArgumentException("Please provide a valid process")
Expand All @@ -59,93 +56,8 @@ object WikiPipelineBenchmark extends Logging {

}



def prep(sc: SparkContext, rawData: String, outBase: String) {

val hadoopconf = new org.apache.hadoop.conf.Configuration
hadoopconf.set("key.value.separator.in.input.line", " ");
hadoopconf.set("xmlinput.start", "<page>");
hadoopconf.set("xmlinput.end", "</page>");

val vertPath = outBase + "_vertices"
val rankPath = outBase + "_ranks"

val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], hadoopconf)
.map(t => t._2.toString)

val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }

val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(vertPath)
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val g = Graph(vertices, edges) //TODO what to do about partitionStrategy???
val pr = PageRank.run(g, 20)
val prToSave = pr.vertices.map {v => v._1 + "\t"+ v._2}
prToSave.saveAsTextFile(rankPath)
}

// def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = {
// val conf = new Configuration
// conf.set("key.value.separator.in.input.line", " ")
// conf.set("xmlinput.start", "<page>")
// conf.set("xmlinput.end", "</page>")
//
// logWarning("about to load xml rdd")
// val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// .map(t => t._2.toString)
// // xmlRDD.count
// logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
// val repartXMLRDD = xmlRDD.repartition(128)
// logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.")
//
// val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache
// logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.")
//
// val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
// logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions")
//
// }

def benchmarkGraphx(sc: SparkContext, rawData: String, numIters: Int) {

val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ")
conf.set("xmlinput.start", "<page>")
conf.set("xmlinput.end", "</page>")

logWarning("about to load xml rdd")
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(t => t._2.toString)
// xmlRDD.count
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
val repartXMLRDD = xmlRDD.repartition(128)
logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.")

val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache
logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.")

val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions")


// val repartAllArtsRDD = allArtsRDD.repartition(128)
// logWarning(s"Total articles: Found ${repartAllArtsRDD.count} PARTITIONED articles.")
// val wikiRDD = unpartWikiRDD.repartition(128).cache
// val wikiRDD = unpartWikiRDD.coalesce(128, false).cache
// logWarning(s"WikiRDD partitions size: ${wikiRDD.partitions.size}")

// val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)

// val wikiRDDCount = wikiRDD.count
// logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.")
// logWarning("Counting differently")


val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val (vertices, edges) = extractLinkGraph(sc, rawData)
logWarning("creating graph")
val g = Graph(vertices, edges)
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache
Expand All @@ -154,7 +66,6 @@ object WikiPipelineBenchmark extends Logging {
val resultG = pagerankConnComponentsAlt(numIters, cleanG)
logWarning(s"ORIGINAL graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
logWarning(s"FINAL graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")

}

def pagerankConnComponentsAlt(numRepetitions: Int, g: Graph[String, Double]): Graph[String, Double] = {
Expand All @@ -164,12 +75,27 @@ object WikiPipelineBenchmark extends Logging {
currentGraph.cache
val startTime = System.currentTimeMillis
logWarning("starting pagerank")
// GRAPH VIEW
val ccGraph = ConnectedComponents.run(currentGraph).cache
val zeroVal = new JTreeSet[VertexId]()
val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => {
s.add(vtuple._2)
s
}
val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => {
s1.addAll(s2)
s1
}
// TABLE VIEW
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size()
logWarning(s"Number of connected components for iteration $i: $numCCs")
val pr = PageRank.run(currentGraph, 20).cache
pr.vertices.count
logWarning("Pagerank completed")
// TABLE VIEW
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache
prAndTitle.vertices.count
logWarning("join completed.")
// logWarning("join completed.")
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}")
val top20verts = top20.map(_._1).toSet
Expand All @@ -178,72 +104,77 @@ object WikiPipelineBenchmark extends Logging {
!top20verts.contains(v)
}
val newGraph = currentGraph.subgraph(x => true, filterTop20).cache
val ccGraph = ConnectedComponents.run(newGraph).cache
// val zeroVal = new mutable.HashSet[VertexId]()
// val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
// s.add(vtuple._2)
// s
// }
// val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2}
// val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp)


val zeroVal = new JTreeSet[VertexId]()
val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => {
s.add(vtuple._2)
s
}
val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => {
s1.addAll(s2)
s1
}
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size()


//(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2})

//(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
logWarning(s"Number of connected components for iteration $i: $numCCs")
newGraph.vertices.count
logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}")
// TODO will this result in too much memory overhead???
currentGraph = newGraph
}
currentGraph
}

// parse wikipedia xml dump and
def preProcess(sc: SparkContext, rawData: String, outBase: String) = {
def writeGraphAsText[V](basePath: String, vertices: RDD[(VertexId, U)], edges: RDD[Edge[Double]], iter: Int = 0) {
val verticesToSave = vertices.map {v => s"${v._1}\t${v._2}"}
val edgesToSave = edges.map {e => s"${e.srcId}\t${e.dstId}"}
vertices.saveAsTextFile(s"${basePath}_vertices_$iter")
edges.saveAsTextFile(s"${basePath}_edges_$iter")
}

// assumes vertex attr is string, can be parsed afterwards
def readEdgesFromText(sc: SparkContext, path: String): RDD[(VertexId, VertexId)] = {
sc.textFile(path, 128).map { line =>
val lineSplits = line.split("\\s+")
(lineSplits(0), lineSplits(1))
}
}

def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = {
val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");
conf.set("key.value.separator.in.input.line", " ")
conf.set("xmlinput.start", "<page>")
conf.set("xmlinput.end", "</page>")

logWarning("about to load xml rdd")
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(t => t._2.toString)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
// xmlRDD.count
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
val repartXMLRDD = xmlRDD.repartition(128)
logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.")

val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache
logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.")

val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions")
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(outBase + "_vertices")
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val edgesToSave = edges.map(e => e.srcId + "\t" + e.dstId)
edgesToSave.saveAsTextFile(outBase + "_edges")
(vertices, edges)

}


def postProcess(sc: SparkContext, rankPath: String, attrPath: String): String = {
val ranks = GraphLoader.loadVertices(sc, rankPath).map {v => (v._1, v._2.toDouble)}
val attrs = GraphLoader.loadVertices(sc, attrPath)

// slightly cheating, but not really
val ranksAndAttrs = ranks.join(attrs)
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1))
top20.mkString("\n")
def pipelinePostProcessing(sc: SparkContext, basePath: String, iter: Int) {
val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_prs")
.map {v => (v._1, v._2.toDouble) }
val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_ccs")
.map {v => (v._1, v._2.toLong) }
val edges = readEdgesFromText(sc, s"${basePath}_edges_$iter")
val artNames = GraphLoader.loadVertices(sc, s"${basePath}_vertices_$iter")
val rankAndTitle = artNames.join(pageranks)
val top20 = rankAndTitle.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
logWarning(s"Top20 for iteration $iter:\n${top20.mkString("\n")}")
val zeroVal = new JTreeSet[VertexId]()
val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => {
s.add(vtuple._2)
s
}
val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => {
s1.addAll(s2)
s1
}
val numCCs = connComponents.aggregate(zeroVal)(seqOp, combOp).size()
logWarning(s"Number of connected components for iteration $iter: $numCCs")
val top20verts = top20.map(_._1).toSet
val newVertices = artNames.filter { case (v, d) => !top20verts.contains(v) }
val newEdges = edges.filter { case (s, d) => !(top20verts.contains(v) || top20verts.contains(d)) }
writeGraphAsText(basePath, newVertices, newEdges, iter + 1)
}
}

class MakeString(tup: (LongWritable, Text)) {
val str = tup._2.toString
}

0 comments on commit e6fc93b

Please sign in to comment.