Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD #2530

Closed
wants to merge 8 commits into from
112 changes: 16 additions & 96 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,112 +17,42 @@

package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.EdgeRDDImpl

/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
* partition for performance. It may additionally store the vertex attributes associated with each
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

override def setName(_name: String): this.type = {
if (partitionsRDD.name != null) {
partitionsRDD.setName(partitionsRDD.name + ", " + _name)
} else {
partitionsRDD.setName(_name)
}
this
}
setName("EdgeRDD")

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

/**
* If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the
* [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
}

override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}

/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

trait EdgeRDD[@specialized ED, VD] extends RDD[Edge[ED]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should probably remove the specialized and test performance. maybe we can do that in a a separate pr

/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
* @tparam ED2 the new edge value type
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD]

/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -135,22 +65,16 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]

private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
/** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
}
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]

/**
* Changes the target storage level while preserving all other properties of the
Expand All @@ -159,11 +83,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
}

private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
}

object EdgeRDD {
Expand Down Expand Up @@ -192,6 +112,6 @@ object EdgeRDD {
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
new EdgeRDDImpl(edgePartitions)
}
}
Loading