In [1]:
/*
* Community Extraction: Clique Percolation Method
*/
import io.arabesque._
import io.arabesque.embedding.{ResultEmbedding, VEmbedding}
import io.arabesque.utils.collection.UnionFindOps // Union-Find structure for merging adjacency sets
import scala.collection.mutable.Map
// clique target size and graph input file
val maxsize = 3
val inputPath = s"${System.getenv ("ARABESQUE_HOME")}/data/citeseer-single-label.graph"
// arabesque context and arabesque graph
@transient val arab = new ArabesqueContext (sc)
@transient val arabGraph = arab.textFile (inputPath)
// 1. First we use Arabesque to extract all the maxsize-cliques from the graph.
// The result is an RDD of ResultEmbedding (vertex-induced embeddings) representing the cliques
@transient val cliquesRes = arabGraph.cliques (maxsize)
@transient val cliques = cliquesRes.embeddings
// 2. Next we extract the clique adjacencies w.r.t. the clique percolation method.
// The result is a mapping containing each sub-clique of size maxsize-1 and its corresponding parent.
// In our context, parents represent sets of sub-cliques that are adjacent (union-find structure)
val cliqueAdjacencies = cliques.map { e =>
// generate all combinations of maxsize-1
val m = Map.empty[ResultEmbedding,ResultEmbedding]
val combs = e.combinations(maxsize - 1)
// get first combination (first sub-clique)
val repr = UnionFindOps.find [ResultEmbedding] (
e => m.get(e).getOrElse(null),
(k,v) => m.update(k,v),
combs.next
)
// garantee that every other sub-clique is adjacent to the first
while (combs.hasNext) {
UnionFindOps.union [ResultEmbedding] (
e => m.get(e).getOrElse(null),
(k,v) => m.update(k,v),
combs.next,
repr
)
}
m
}.reduce { (m1,m2) =>
// merge union-find structures by making sure that every pair of (sub-clique,parent) is also adjacent
// in the other mapping
for ((k,_) <- m2) {
val parent = UnionFindOps.find [ResultEmbedding] (
e => m2.get(e).getOrElse(null),
(k,v) => m2.update(k,v),
k
)
if (!(parent equals k)) {
UnionFindOps.union [ResultEmbedding] (
e => m1.get(e).getOrElse(null),
(k,v) => m1.update(k,v),
parent,
k
)
}
}
m1
}
// 3. We broadcast the global adjacency for community extraction
val cliqueAdjacenciesBc = sc.broadcast (cliqueAdjacencies)
// 4. For each maxsize-clique, we find the adjacency set it belongs to and emit a pair (adj-set,clique).
// Finally we merge embeddings (cliques) that belong to the same adjacency set
// The result is an RDD of embeddings representing communities
val communities = cliques.map { e =>
val m = cliqueAdjacenciesBc.value
val key = UnionFindOps.find [ResultEmbedding] (
e => m.get(e).getOrElse(null),
(k,v) => m.update(k,v),
e.combinations(maxsize - 1).next)
(key, e)
}.reduceByKey (
(e1,e2) => new VEmbedding((e1.words.toSet ++ e2.words.toSet).toArray)
).values
// 5. Sort by decreasing order of size and display
println (s"number of communities = ${communities.count}")
communities.collect.sortBy (_.words.size * -1).mkString("\n")
Out[1]: