Skip to content

Commit

Permalink
Take activeSet in ExistingEdgePartitionBuilder
Browse files Browse the repository at this point in the history
Also rename VertexPreservingEdgePartitionBuilder to
ExistingEdgePartitionBuilder to better reflect its usage.
  • Loading branch information
ankurdave committed Nov 4, 2014
1 parent c85076d commit e0f8ecc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ class EdgePartition[
Some(activeSet))
}

/** Return a new `EdgePartition` with the specified active set. */
def withActiveSet(activeSet: Option[VertexSet]): EdgePartition[ED, VD] = {
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}

/** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
val newVertexAttrs = new Array[VD](vertexAttrs.length)
Expand Down Expand Up @@ -116,8 +110,8 @@ class EdgePartition[
* @return a new edge partition with all edges reversed.
*/
def reverse: EdgePartition[ED, VD] = {
val builder = new VertexPreservingEdgePartitionBuilder(
global2local, local2global, vertexAttrs, size)(classTag[ED], classTag[VD])
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet, size)
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
Expand All @@ -128,7 +122,7 @@ class EdgePartition[
builder.add(dstId, srcId, localDstId, localSrcId, attr)
i += 1
}
builder.toEdgePartition.withActiveSet(activeSet)
builder.toEdgePartition
}

/**
Expand Down Expand Up @@ -189,8 +183,8 @@ class EdgePartition[
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet)
var i = 0
while (i < size) {
// The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
Expand All @@ -207,7 +201,7 @@ class EdgePartition[
}
i += 1
}
builder.toEdgePartition.withActiveSet(activeSet)
builder.toEdgePartition
}

/**
Expand All @@ -227,8 +221,8 @@ class EdgePartition[
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet)
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
var currLocalSrcId = -1
Expand Down Expand Up @@ -260,7 +254,7 @@ class EdgePartition[
if (size > 0) {
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
}
builder.toEdgePartition.withActiveSet(activeSet)
builder.toEdgePartition
}

/**
Expand All @@ -276,8 +270,8 @@ class EdgePartition[
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
val builder = new VertexPreservingEdgePartitionBuilder[ED3, VD](
global2local, local2global, vertexAttrs)
val builder = new ExistingEdgePartitionBuilder[ED3, VD](
global2local, local2global, vertexAttrs, activeSet)
var i = 0
var j = 0
// For i = index of each edge in `this`...
Expand All @@ -296,7 +290,7 @@ class EdgePartition[
}
i += 1
}
builder.toEdgePartition.withActiveSet(activeSet)
builder.toEdgePartition
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla

/**
* Constructs an EdgePartition from an existing EdgePartition with the same vertex set. This enables
* reuse of the local vertex ids.
* reuse of the local vertex ids. Intended for internal use in EdgePartition only.
*/
private[graphx]
class VertexPreservingEdgePartitionBuilder[
private[impl]
class ExistingEdgePartitionBuilder[
@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
local2global: Array[VertexId],
vertexAttrs: Array[VD],
activeSet: Option[VertexSet],
size: Int = 64) {
var edges = new PrimitiveVector[EdgeWithLocalIds[ED]](size)

Expand Down Expand Up @@ -119,14 +120,14 @@ class VertexPreservingEdgePartitionBuilder[
}

new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs)
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}
}

private[graphx] case class EdgeWithLocalIds[@specialized ED](
private[impl] case class EdgeWithLocalIds[@specialized ED](
srcId: VertexId, dstId: VertexId, localSrcId: Int, localDstId: Int, attr: ED)

private[graphx] object EdgeWithLocalIds {
private[impl] object EdgeWithLocalIds {
implicit def lexicographicOrdering[ED] = new Ordering[EdgeWithLocalIds[ED]] {
override def compare(a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]): Int = {
if (a.srcId == b.srcId) {
Expand Down

0 comments on commit e0f8ecc

Please sign in to comment.