Skip to content

Commit

Permalink
Address @rxin comments on apache#3054 and apache#3100
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Nov 12, 2014
1 parent 1e80aca commit f5b65d0
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark.graphx
* Represents an edge along with its neighboring vertices and allows sending messages along the
* edge. Used in [[Graph#aggregateMessages]].
*/
trait EdgeContext[VD, ED, A] {
abstract class EdgeContext[VD, ED, A] {
/** The vertex id of the edge's source vertex. */
def srcId: VertexId
/** The vertex id of the edge's destination vertex. */
Expand Down
93 changes: 74 additions & 19 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
: Graph[VD, ED2]

/**
* Transforms each edge attribute using the map function, passing it the adjacent vertex
* attributes as well. If adjacent vertex values are not required,
* consider using `mapEdges` instead.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
* the underlying index structures can be reused.
*
* @param map the function from an edge object to a new edge value.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to initialize edge
* attributes based on the attributes associated with each vertex.
* {{{
* val rawGraph: Graph[Int, Int] = someLoadFunction()
* val graph = rawGraph.mapTriplets[Int]( edge =>
* edge.src.data - edge.dst.data)
* }}}
*
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}

/**
* Transforms each edge attribute using the map function, passing it the adjacent vertex
* attributes as well. If adjacent vertex values are not required,
Expand All @@ -211,7 +238,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2,
tripletFields: TripletFields = TripletFields.All): Graph[VD, ED2] = {
tripletFields: TripletFields): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), tripletFields)
}

Expand Down Expand Up @@ -305,13 +332,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* be commutative and associative and is used to combine the output
* of the map phase
*
* @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
* consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
* edges with destination in the active set. If the direction is `Out`,
* `mapFunc` will only be run on edges originating from vertices in the active set. If the
* direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
* . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
* active set. The active set must have the same index as the graph's vertices.
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
* desired. This is done by specifying a set of "active" vertices and an edge direction. The
* `sendMsg` function will then run only on edges connected to active vertices by edges in the
* specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
* destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
* originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
* run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
* will be run on edges with *both* vertices in the active set. The active set must have the
* same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
Expand Down Expand Up @@ -349,15 +378,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* combiner should be commutative and associative.
* @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
* `sendMsg` function. If not all fields are needed, specifying this can improve performance.
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
* desired. This is done by specifying a set of "active" vertices and an edge direction. The
* `sendMsg` function will then run on only edges connected to active vertices by edges in the
* specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
* destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
* originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
* run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
* will be run on edges with *both* vertices in the active set. The active set must have the
* same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
Expand All @@ -377,8 +397,43 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
* sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
* destined to the same vertex.
*
* This variant can take an active set to restrict the computation and is intended for internal
* use only.
*
* @tparam A the type of message to be sent to each vertex
*
* @param sendMsg runs on each edge, sending messages to neighboring vertices using the
* [[EdgeContext]].
* @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
* combiner should be commutative and associative.
* @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
* `sendMsg` function. If not all fields are needed, specifying this can improve performance.
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
* desired. This is done by specifying a set of "active" vertices and an edge direction. The
* `sendMsg` function will then run on only edges connected to active vertices by edges in the
* specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
* destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
* originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
* run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
* will be run on edges with *both* vertices in the active set. The active set must have the
* same index as the graph's vertices.
*/
private[graphx] def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)])
: VertexRDD[A]

/**
Expand Down
51 changes: 51 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx;

import java.io.Serializable;

/**
* Represents a subset of the fields of an [[EdgeTriplet]] or [[EdgeContext]]. This allows the
* system to populate only those fields for efficiency.
*/
public class TripletFields implements Serializable {
public final boolean useSrc;
public final boolean useDst;
public final boolean useEdge;

public TripletFields() {
this(true, true, true);
}

public TripletFields(boolean useSrc, boolean useDst, boolean useEdge) {
this.useSrc = useSrc;
this.useDst = useDst;
this.useEdge = useEdge;
}

public static final TripletFields None = new TripletFields(false, false, false);
public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
public static final TripletFields SrcOnly = new TripletFields(true, false, false);
public static final TripletFields DstOnly = new TripletFields(false, true, false);
public static final TripletFields SrcDstOnly = new TripletFields(true, true, false);
public static final TripletFields SrcAndEdge = new TripletFields(true, false, true);
public static final TripletFields Src = SrcAndEdge;
public static final TripletFields DstAndEdge = new TripletFields(false, true, true);
public static final TripletFields Dst = DstAndEdge;
public static final TripletFields All = new TripletFields(true, true, true);
}
59 changes: 0 additions & 59 deletions graphx/src/main/scala/org/apache/spark/graphx/TripletFields.scala

This file was deleted.

Loading

0 comments on commit f5b65d0

Please sign in to comment.