In [1]:
 * Community Extraction: Clique Percolation Method

import io.arabesque._
import io.arabesque.embedding.{ResultEmbedding, VEmbedding}
import io.arabesque.utils.collection.UnionFindOps // Union-Find structure for merging adjacency sets
import scala.collection.mutable.Map

// clique target size and graph input file
val maxsize = 3
val inputPath = s"${System.getenv ("ARABESQUE_HOME")}/data/citeseer-single-label.graph"

// arabesque context and arabesque graph
@transient val arab = new ArabesqueContext (sc)
@transient val arabGraph = arab.textFile (inputPath)

// 1. First we use Arabesque to extract all the maxsize-cliques from the graph.
// The result is an RDD of ResultEmbedding (vertex-induced embeddings) representing the cliques
@transient val cliquesRes = arabGraph.cliques (maxsize) 
@transient val cliques = cliquesRes.embeddings

// 2. Next we extract the clique adjacencies w.r.t. the clique percolation method.
// The result is a mapping containing each sub-clique of size maxsize-1 and its corresponding parent.
// In our context, parents represent sets of sub-cliques that are adjacent (union-find structure)
val cliqueAdjacencies = { e =>
    // generate all combinations of maxsize-1
    val m = Map.empty[ResultEmbedding,ResultEmbedding]
    val combs = e.combinations(maxsize - 1)
    // get first combination (first sub-clique)
    val repr = UnionFindOps.find [ResultEmbedding] (
        e => m.get(e).getOrElse(null),
        (k,v) => m.update(k,v),
    // garantee that every other sub-clique is adjacent to the first
    while (combs.hasNext) {
        UnionFindOps.union [ResultEmbedding] (
            e => m.get(e).getOrElse(null),
            (k,v) => m.update(k,v),
}.reduce { (m1,m2) =>
    // merge union-find structures by making sure that every pair of (sub-clique,parent) is also adjacent
    // in the other mapping
    for ((k,_) <- m2) {
      val parent = UnionFindOps.find [ResultEmbedding] (
          e => m2.get(e).getOrElse(null),
          (k,v) => m2.update(k,v),
      if (!(parent equals k)) {
        UnionFindOps.union [ResultEmbedding] (
            e => m1.get(e).getOrElse(null),
            (k,v) => m1.update(k,v),

// 3. We broadcast the global adjacency for community extraction
val cliqueAdjacenciesBc = sc.broadcast (cliqueAdjacencies)

// 4. For each maxsize-clique, we find the adjacency set it belongs to and emit a pair (adj-set,clique).
// Finally we merge embeddings (cliques) that belong to the same adjacency set
// The result is an RDD of embeddings representing communities
val communities = { e =>
    val m = cliqueAdjacenciesBc.value
    val key = UnionFindOps.find [ResultEmbedding] (
        e => m.get(e).getOrElse(null),
        (k,v) => m.update(k,v),
        e.combinations(maxsize - 1).next)
    (key, e)
}.reduceByKey (
    (e1,e2) => new VEmbedding((e1.words.toSet ++ e2.words.toSet).toArray)

// 5. Sort by decreasing order of size and display
println (s"number of communities = ${communities.count}")
communities.collect.sortBy (_.words.size * -1).mkString("\n")

number of communities = 234
VEmbedding(3008, 2263, 3153, 3108, 3217, 3276, 3103, 2216, 3308, 3204, 3221, 3029, 2269, 3148, 2857, 3084, 2635, 3268, 2996, 2650, 3290, 3093, 3189, 3157, 3243, 3011, 3303, 3022, 3239, 2606, 3250, 3207, 3282, 3307, 3271, 2628, 3144, 2228, 2990, 2685, 2858, 3311, 3210, 3094, 2649, 3294, 2851, 3222, 3115, 3162, 3201, 3225, 3183, 3257, 3262, 3274, 3234, 3255, 3240, 2981, 2991, 3304, 2987, 2637, 3270, 2871, 2632, 2625, 3192, 3298, 2874, 2278, 2842, 3159, 3193, 2665, 3161, 3146, 2684, 3278, 3260, 3305, 3114, 1538, 1579, 3095, 3107, 3156, 2865, 3284, 2885, 3188, 3220, 3252, 1558, 3267, 2683, 3232, 3291, 2604, 3164, 3309, 3015, 2186, 3277, 2619, 3259, 3020, 2864, 3299, 3096, 3296)
VEmbedding(2953, 2837, 3263, 3275, 3216, 3258, 2965, 3303, 3297, 3102, 3272, 3077, 3304, 2802, 320...