-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimizations for mapReduceTriplets and EdgePartition #3054
Conversation
1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in mapReduceTriplets are inlined into a while loop.
Test build #22712 has started for PR 3054 at commit
|
Test build #22712 has finished for PR 3054 at commit
|
Test PASSed. |
this.withVertices(vertices.innerJoinKeepLeft(iter)) | ||
val newVertexAttrs = new Array[VD](vertexAttrs.length) | ||
System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length) | ||
iter.foreach { kv => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rewrite this with while loop
Also rename VertexPreservingEdgePartitionBuilder to ExistingEdgePartitionBuilder to better reflect its usage.
@rxin Thanks for the comments. I addressed them and made some other improvements. PTAL |
Test build #22874 has started for PR 3054 at commit
|
Test FAILed. |
Jenkins, retest this please. |
Test build #22876 has started for PR 3054 at commit
|
Test build #22874 has finished for PR 3054 at commit
|
Test PASSed. |
Test build #22876 has finished for PR 3054 at commit
|
Test PASSed. |
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. This is more efficient, providing a 20.2% speedup on PageRank over apache#3054 (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 403 s to 322 s). 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Subsumes apache#2815.
* @param activeSet an optional active vertex set for filtering computation on the edges | ||
*/ | ||
private[graphx] | ||
class EdgePartition[ | ||
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag]( | ||
val srcIds: Array[VertexId] = null, | ||
val dstIds: Array[VertexId] = null, | ||
val localSrcIds: Array[Int] = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create an explicit empty ctor instead of having null value for everything? and in that ctor say it is only needed for serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also try to make all of these private rather than val's. (just remove the val)
edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true) | ||
: Iterator[EdgeTriplet[VD, ED]] = { | ||
new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst) | ||
def mapReduceTriplets[A: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better to rename this mapRedueTripletsEdgeScan, to contrast with the other index scan one.
Closing this in favor of #3100, which incorporates these changes. |
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes apache#2815. Also fixes SPARK-4173. Author: Ankur Dave <[email protected]> Closes apache#3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on apache#3054 and apache#3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes #2815. Also fixes SPARK-4173. Author: Ankur Dave <[email protected]> Closes #3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on #3054 and #3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition (cherry picked from commit faeb41d) Signed-off-by: Reynold Xin <[email protected]>
These optimizations were tested to provide a 21.4% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 403 s).
Also fixes SPARK-4173.