Skip to content

Commit

Permalink
Optimizations for mapReduceTriplets and EdgePartition
Browse files Browse the repository at this point in the history
1. EdgePartition now stores local vertex ids instead of global ids. This
   avoids hash lookups when looking up vertex attributes and aggregating
   messages.

2. Internal iterators in mapReduceTriplets are inlined into a while
   loop.
  • Loading branch information
ankurdave committed Nov 1, 2014
1 parent 26d31d1 commit 4a566dc
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 175 deletions.
262 changes: 187 additions & 75 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.BitSet

/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
Expand All @@ -30,54 +31,76 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
* @tparam ED the edge attribute type
* @tparam VD the vertex attribute type
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param localSrcIds the local source vertex id of each edge as an index into `local2global` and
* `vertexAttrs`
* @param localDstIds the local destination vertex id of each edge as an index into `local2global`
* and `vertexAttrs`
* @param data the attribute associated with each edge
* @param index a clustered index on source vertex id
* @param vertices a map from referenced vertex ids to their corresponding attributes. Must
* contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
* those vertex ids. The mask is not used.
* @param index a clustered index on source vertex id as a map from each global source vertex id to
* the offset in the edge arrays where the cluster for that vertex id begins
* @param global2local a map from referenced vertex ids to local ids which index into vertexAttrs
* @param local2global an array of global vertex ids where the offsets are local vertex ids
* @param vertexAttrs an array of vertex attributes where the offsets are local vertex ids
* @param activeSet an optional active vertex set for filtering computation on the edges
*/
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
val srcIds: Array[VertexId] = null,
val dstIds: Array[VertexId] = null,
val localSrcIds: Array[Int] = null,
val localDstIds: Array[Int] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val vertices: VertexPartition[VD] = null,
val global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val local2global: Array[VertexId] = null,
val vertexAttrs: Array[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {

/** Return a new `EdgePartition` with the specified edge data. */
def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
}

/** Return a new `EdgePartition` with the specified vertex partition. */
def withVertices[VD2: ClassTag](
vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = {
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}

/** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
val activeSet = new VertexSet
iter.foreach(activeSet.add(_))
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs,
Some(activeSet))
}

/** Return a new `EdgePartition` with the specified active set. */
def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
def withActiveSet(activeSet: Option[VertexSet]): EdgePartition[ED, VD] = {
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}

/** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
this.withVertices(vertices.innerJoinKeepLeft(iter))
val newVertexAttrs = new Array[VD](vertexAttrs.length)
System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
iter.foreach { kv =>
newVertexAttrs(global2local(kv._1)) = kv._2
}
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
activeSet)
}

/** Return a new `EdgePartition` without any locally cached vertex attributes. */
def clearVertices[VD2: ClassTag](): EdgePartition[ED, VD2] = {
val newVertexAttrs = new Array[VD2](vertexAttrs.length)
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
activeSet)
}

def srcIds(i: Int): VertexId = local2global(localSrcIds(i))

def dstIds(i: Int): VertexId = local2global(localDstIds(i))

/** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid)
Expand All @@ -92,11 +115,19 @@ class EdgePartition[
* @return a new edge partition with all edges reversed.
*/
def reverse: EdgePartition[ED, VD] = {
val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
for (e <- iterator) {
builder.add(e.dstId, e.srcId, e.attr)
val builder = new VertexPreservingEdgePartitionBuilder(
global2local, local2global, vertexAttrs, size)(classTag[ED], classTag[VD])
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val srcId = local2global(localSrcId)
val dstId = local2global(localDstId)
val attr = data(i)
builder.add(dstId, srcId, localDstId, localSrcId, attr)
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand Down Expand Up @@ -157,13 +188,25 @@ class EdgePartition[
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
val filtered = tripletIterator().filter(et =>
vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
val builder = new EdgePartitionBuilder[ED, VD]
for (e <- filtered) {
builder.add(e.srcId, e.dstId, e.attr)
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
var i = 0
while (i < size) {
// The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val et = new EdgeTriplet[VD, ED]
et.srcId = local2global(localSrcId)
et.dstId = local2global(localDstId)
et.srcAttr = vertexAttrs(localSrcId)
et.dstAttr = vertexAttrs(localDstId)
et.attr = data(i)
if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
}
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand All @@ -183,7 +226,8 @@ class EdgePartition[
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
val builder = new EdgePartitionBuilder[ED, VD]
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED]
Expand All @@ -193,7 +237,7 @@ class EdgePartition[
currAttr = merge(currAttr, data(i))
} else {
if (i > 0) {
builder.add(currSrcId, currDstId, currAttr)
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
}
currSrcId = srcIds(i)
currDstId = dstIds(i)
Expand All @@ -202,9 +246,9 @@ class EdgePartition[
i += 1
}
if (size > 0) {
builder.add(currSrcId, currDstId, currAttr)
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand All @@ -220,7 +264,8 @@ class EdgePartition[
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
val builder = new EdgePartitionBuilder[ED3, VD]
val builder = new VertexPreservingEdgePartitionBuilder[ED3, VD](
global2local, local2global, vertexAttrs)
var i = 0
var j = 0
// For i = index of each edge in `this`...
Expand All @@ -233,20 +278,21 @@ class EdgePartition[
while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
// ... run `f` on the matching edge
builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
builder.add(srcId, dstId, localSrcIds(i), localDstIds(i),
f(srcId, dstId, this.data(i), other.data(j)))
}
}
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
* The number of edges in this partition
*
* @return size of the partition
*/
val size: Int = srcIds.size
val size: Int = localSrcIds.size

/** The number of unique source vertices in the partition. */
def indexSize: Int = index.size
Expand Down Expand Up @@ -285,50 +331,116 @@ class EdgePartition[
}

/**
* Upgrade the given edge iterator into a triplet iterator.
* Send messages along edges and aggregate them at the receiving vertices. Implemented by scanning
* all edges sequentially and filtering them with `idPred`.
*
* @param mapFunc the edge map function which generates messages to neighboring vertices
* @param reduceFunc the combiner applied to messages destined to the same vertex
* @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source vertex attribute
* @param mapUsesDstAttr whether or not `mapFunc` uses the edge's destination vertex attribute
* @param idPred a predicate to filter edges based on their source and destination vertex ids
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
* @return iterator aggregated messages keyed by the receiving vertex id
*/
def upgradeIterator(
edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
: Iterator[EdgeTriplet[VD, ED]] = {
new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
mapUsesSrcAttr: Boolean,
mapUsesDstAttr: Boolean,
idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

var edge = new EdgeTriplet[VD, ED]
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
if (idPred(srcId, dstId)) {
edge.srcId = srcId
edge.dstId = dstId
edge.attr = data(i)
if (mapUsesSrcAttr) { edge.srcAttr = vertexAttrs(localSrcId) }
if (mapUsesDstAttr) { edge.dstAttr = vertexAttrs(localDstId) }

mapFunc(edge).foreach { kv =>
val globalId = kv._1
val msg = kv._2
val localId = if (globalId == srcId) localSrcId else localDstId
if (bitset.get(localId)) {
aggregates(localId) = reduceFunc(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
}
i += 1
}

bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
}

/**
* Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
* Send messages along edges and aggregate them at the receiving vertices. Implemented by
* filtering the source vertex index with `srcIdPred`, then scanning edge clusters and filtering
* with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an edge to run.
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
*/
def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))

/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
* @param mapFunc the edge map function which generates messages to neighboring vertices
* @param reduceFunc the combiner applied to messages destined to the same vertex
* @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source vertex attribute
* @param mapUsesDstAttr whether or not `mapFunc` uses the edge's destination vertex attribute
* @param srcIdPred a predicate to filter edges based on their source vertex id
* @param dstIdPred a predicate to filter edges based on their destination vertex id
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
* @return iterator aggregated messages keyed by the receiving vertex id
*/
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
def mapReduceTripletsWithIndex[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
mapUsesSrcAttr: Boolean,
mapUsesDstAttr: Boolean,
srcIdPred: VertexId => Boolean,
dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

override def hasNext: Boolean = {
pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
}
var edge = new EdgeTriplet[VD, ED]
index.iterator.foreach { cluster =>
val clusterSrcId = cluster._1
val clusterPos = cluster._2
val clusterLocalSrcId = localSrcIds(clusterPos)
if (srcIdPred(clusterSrcId)) {
var pos = clusterPos
edge.srcId = clusterSrcId
if (mapUsesSrcAttr) { edge.srcAttr = vertexAttrs(clusterLocalSrcId) }
while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
val localDstId = localDstIds(pos)
val dstId = local2global(localDstId)
if (dstIdPred(dstId)) {
edge.dstId = dstId
edge.attr = data(pos)
if (mapUsesDstAttr) { edge.dstAttr = vertexAttrs(localDstId) }

override def next(): Edge[ED] = {
assert(srcIds(pos) == srcId)
edge.srcId = srcIds(pos)
edge.dstId = dstIds(pos)
edge.attr = data(pos)
pos += 1
edge
mapFunc(edge).foreach { kv =>
val globalId = kv._1
val msg = kv._2
val localId = if (globalId == clusterSrcId) clusterLocalSrcId else localDstId
if (bitset.get(localId)) {
aggregates(localId) = reduceFunc(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
}
pos += 1
}
}
}

bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
}
}
Loading

0 comments on commit 4a566dc

Please sign in to comment.