diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala new file mode 100644 index 0000000000000..776bfb8dd6bfa --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag +import org.apache.spark.graphx._ + +/** Label Propagation algorithm. */ +object LabelPropagation { + /** + * Run static Label Propagation for detecting communities in networks. + * + * Each node in the network is initially assigned to its own community. At every superstep, nodes + * send their community affiliation to all neighbors and update their state to the mode community + * affiliation of incoming messages. + * + * LPA is a standard community detection algorithm for graphs. It is very inexpensive + * computationally, although (1) convergence is not guaranteed and (2) one can end up with + * trivial solutions (all nodes are identified into a single community). + * + * @tparam ED the edge attribute type (not used in the computation) + * + * @param graph the graph for which to compute the community affiliation + * @param maxSteps the number of supersteps of LPA to be performed. Because this is a static + * implementation, the algorithm will run for exactly this many supersteps. + * + * @return a graph with vertex attributes containing the label of community affiliation + */ + def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, ED] = { + val lpaGraph = graph.mapVertices { case (vid, _) => vid } + def sendMessage(e: EdgeTriplet[VertexId, ED]) = { + Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L))) + } + def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]) + : Map[VertexId, Long] = { + (count1.keySet ++ count2.keySet).map { i => + val count1Val = count1.getOrElse(i, 0L) + val count2Val = count2.getOrElse(i, 0L) + i -> (count1Val + count2Val) + }.toMap + } + def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = { + if (message.isEmpty) attr else message.maxBy(_._2)._1 + } + val initialMessage = Map[VertexId, Long]() + Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)( + vprog = vertexProgram, + sendMsg = sendMessage, + mergeMsg = mergeMessage) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala new file mode 100644 index 0000000000000..61fd0c4605568 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ + +class LabelPropagationSuite extends FunSuite with LocalSparkContext { + test("Label Propagation") { + withSpark { sc => + // Construct a graph with two cliques connected by a single edge + val n = 5 + val clique1 = for (u <- 0L until n; v <- 0L until n) yield Edge(u, v, 1) + val clique2 = for (u <- 0L to n; v <- 0L to n) yield Edge(u + n, v + n, 1) + val twoCliques = sc.parallelize(clique1 ++ clique2 :+ Edge(0L, n, 1)) + val graph = Graph.fromEdges(twoCliques, 1) + // Run label propagation + val labels = LabelPropagation.run(graph, n * 4).cache() + + // All vertices within a clique should have the same label + val clique1Labels = labels.vertices.filter(_._1 < n).map(_._2).collect.toArray + assert(clique1Labels.forall(_ == clique1Labels(0))) + val clique2Labels = labels.vertices.filter(_._1 >= n).map(_._2).collect.toArray + assert(clique2Labels.forall(_ == clique2Labels(0))) + // The two cliques should have different labels + assert(clique1Labels(0) != clique2Labels(0)) + } + } +}