Testing Spark with Scala kernel


In [1]:
val rdd = sc.parallelize(Array.range(1, 100),2)

In [2]:
rdd.collect


Out[2]:
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

In [3]:
rdd.count


Out[3]:
99

In [4]:
val a = rdd.map(x => 2*x).reduce(_ + _)

In [5]:
println(a)


9900

In [6]:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

In [7]:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

In [8]:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

In [9]:
//val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count


Out[9]:
1

In [10]:
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count


Out[10]:
1

In [11]:
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))


istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal

In [12]:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))


istoica is the colleague of franklin
rxin is the collab of jgonzal
peter is the student of John Doe
franklin is the colleague of John Doe
franklin is the advisor of rxin
franklin is the pi of jgonzal
(4,(peter,student))
(3,(rxin,student))
(7,(jgonzal,postdoc))
(5,(franklin,prof))
(2,(istoica,prof))
istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal

In [13]:
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

In [14]:
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))


(19,56.52173913043478)
(39,62.888888888888886)
(34,70.07142857142857)
(4,48.5)
(71,87.44444444444444)
(66,79.0)
(80,84.2)
(65,78.5)
(11,66.17647058823529)
(14,70.5)
(35,58.1764705882353)
(24,61.333333333333336)
(37,72.46153846153847)
(1,48.5)
(74,80.25)
(63,80.625)
(89,90.0)
(17,57.44444444444444)
(18,64.05555555555556)
(12,54.7)
(38,59.111111111111114)
(20,60.72727272727273)
(90,98.0)
(94,98.0)
(41,68.54545454545455)
(21,49.0)
(77,83.5)
(53,73.6923076923077)
(22,60.6)
(25,61.857142857142854)
(46,75.3)
(59,78.84615384615384)
(62,92.14285714285714)
(93,97.0)
(33,64.0)
(23,59.714285714285715)
(40,65.375)
(6,60.3)
(67,73.0)
(69,86.0)
(3,45.4)
(85,86.33333333333333)
(58,72.4)
(60,81.72727272727273)
(86,96.0)
(91,94.0)
(31,66.72222222222223)
(26,64.6923076923077)
(5,53.625)
(2,58.65)
(13,50.92307692307692)
(96,98.0)
(52,79.0)
(81,88.75)
(16,47.5)
(55,79.25)
(82,86.0)
(28,73.91666666666667)
(29,56.529411764705884)
(79,88.75)
(54,76.0)
(30,60.2)
(50,69.54545454545455)
(36,70.125)
(92,99.0)
(64,82.11111111111111)
(57,68.2)
(51,79.0)
(75,84.0)
(45,71.85714285714286)
(72,85.66666666666667)
(70,78.25)
(9,48.833333333333336)
(49,70.15384615384616)
(78,82.0)
(43,65.5)
(10,51.541666666666664)
(84,90.4)
(61,79.53333333333333)
(56,80.66666666666667)
(15,62.04761904761905)
(47,74.2)
(76,86.66666666666667)
(95,96.0)
(48,60.125)
(73,82.42857142857143)
(32,65.94444444444444)
(27,64.66666666666667)
(0,58.285714285714285)
(42,74.75)
(7,49.9)
(8,63.89473684210526)
(44,69.38095238095238)
(88,92.0)
(68,88.4)

In [ ]:
%AddDeps com.databricks spark-csv_2.10 1.3.0 --transitive


:: loading settings :: url = jar:file:/opt/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
:: resolving dependencies :: com.ibm.spark#spark-kernel;working
	confs: [default]
	found com.databricks#spark-csv_2.10;1.3.0 in central
	found org.apache.commons#commons-csv;1.1 in central
	found com.univocity#univocity-parsers;1.5.1 in central
	found org.scoverage#scalac-scoverage-runtime_2.10;1.1.0 in central
	found org.scoverage#scalac-scoverage-plugin_2.10;1.1.0 in central
	found org.apache.spark#spark-core_2.10;1.5.0 in central
	found com.google.guava#guava;14.0.1 in central
	found com.google.code.findbugs#jsr305;1.3.9 in central
	found javax.inject#javax.inject;1 in central
	found org.apache.avro#avro-mapred;1.7.7 in central
	found org.apache.avro#avro-ipc;1.7.7 in central
	found org.apache.avro#avro;1.7.7 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
	found com.thoughtworks.paranamer#paranamer;2.3 in central
	found org.xerial.snappy#snappy-java;1.1.1.7 in central
	found org.apache.commons#commons-compress;1.4.1 in central
	found org.tukaani#xz;1.0 in central
	found com.twitter#chill_2.10;0.5.0 in central
	found com.twitter#chill-java;0.5.0 in central
	found com.esotericsoftware.kryo#kryo;2.21 in central
	found com.esotericsoftware.reflectasm#reflectasm;1.07 in central
	found com.esotericsoftware.minlog#minlog;1.2 in central
	found org.objenesis#objenesis;1.2 in central
	found org.apache.hadoop#hadoop-client;2.2.0 in central
	found org.apache.hadoop#hadoop-common;2.2.0 in central
	found org.apache.hadoop#hadoop-annotations;2.2.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math;2.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-codec#commons-codec;1.10 in central
	found commons-io#commons-io;2.1 in central
	found commons-net#commons-net;2.2 in central
	found log4j#log4j;1.2.17 in central
	found commons-lang#commons-lang;2.6 in central
	found commons-configuration#commons-configuration;1.6 in central
	found commons-collections#commons-collections;3.2.1 in central
	found commons-digester#commons-digester;1.8 in central
	found commons-beanutils#commons-beanutils;1.7.0 in central
	found commons-beanutils#commons-beanutils-core;1.8.0 in central
	found com.google.protobuf#protobuf-java;2.5.0 in central
	found org.apache.hadoop#hadoop-auth;2.2.0 in central
	found org.apache.hadoop#hadoop-hdfs;2.2.0 in central
	found org.mortbay.jetty#jetty-util;6.1.26 in central
	found org.apache.hadoop#hadoop-mapreduce-client-app;2.2.0 in central
	found org.apache.hadoop#hadoop-mapreduce-client-common;2.2.0 in central
	found org.apache.hadoop#hadoop-yarn-common;2.2.0 in central
	found org.apache.hadoop#hadoop-yarn-api;2.2.0 in central
	found org.slf4j#slf4j-log4j12;1.7.10 in central
	found com.google.inject#guice;3.0 in central
	found aopalliance#aopalliance;1.0 in central
	found org.sonatype.sisu.inject#cglib;2.2.1-v20090111 in central
	found com.sun.jersey.jersey-test-framework#jersey-test-framework-grizzly2;1.9 in central
	found com.sun.jersey#jersey-server;1.9 in central
	found asm#asm;3.2 in central
	found com.sun.jersey#jersey-json;1.9 in central
	found org.codehaus.jettison#jettison;1.1 in central
	found stax#stax-api;1.0.1 in central
	found com.sun.xml.bind#jaxb-impl;2.2.3-1 in central
	found javax.xml.bind#jaxb-api;2.2.2 in central
	found javax.activation#activation;1.1 in central
	found org.codehaus.jackson#jackson-jaxrs;1.9.13 in central
	found org.codehaus.jackson#jackson-xc;1.9.13 in central
	found com.sun.jersey.contribs#jersey-guice;1.9 in central
	found org.apache.hadoop#hadoop-yarn-client;2.2.0 in central
	found org.apache.hadoop#hadoop-mapreduce-client-core;2.2.0 in central
	found org.apache.hadoop#hadoop-yarn-server-common;2.2.0 in central
	found org.apache.hadoop#hadoop-mapreduce-client-shuffle;2.2.0 in central
	found org.apache.hadoop#hadoop-mapreduce-client-jobclient;2.2.0 in central
	found org.apache.spark#spark-launcher_2.10;1.5.0 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.spark#spark-network-common_2.10;1.5.0 in central
	found io.netty#netty-all;4.0.29.Final in central
	found org.apache.spark#spark-network-shuffle_2.10;1.5.0 in central
	found org.apache.spark#spark-unsafe_2.10;1.5.0 in central
	found net.java.dev.jets3t#jets3t;0.7.1 in central
	found org.apache.curator#curator-recipes;2.4.0 in central
	found org.apache.curator#curator-framework;2.4.0 in central
	found org.apache.curator#curator-client;2.4.0 in central
	found org.apache.zookeeper#zookeeper;3.4.5 in central
	found jline#jline;0.9.94 in central
	found org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016 in central
	found org.apache.commons#commons-lang3;3.3.2 in central
	found org.apache.commons#commons-math3;3.4.1 in central
	found org.slf4j#jul-to-slf4j;1.7.10 in central
	found org.slf4j#jcl-over-slf4j;1.7.10 in central
	found com.ning#compress-lzf;1.0.3 in central
	found net.jpountz.lz4#lz4;1.3.0 in central
	found org.roaringbitmap#RoaringBitmap;0.4.5 in central
	found com.typesafe.akka#akka-remote_2.10;2.3.11 in central
	found com.typesafe.akka#akka-actor_2.10;2.3.11 in central
	found com.typesafe#config;1.2.1 in central
	found io.netty#netty;3.8.0.Final in central
	found org.uncommons.maths#uncommons-maths;1.2.2a in central
	found com.typesafe.akka#akka-slf4j_2.10;2.3.11 in central
	found org.json4s#json4s-jackson_2.10;3.2.10 in central
	found org.json4s#json4s-core_2.10;3.2.10 in central
	found org.json4s#json4s-ast_2.10;3.2.10 in central
	found com.thoughtworks.paranamer#paranamer;2.6 in central
	found com.fasterxml.jackson.core#jackson-databind;2.4.4 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.4.0 in central
	found com.fasterxml.jackson.core#jackson-core;2.4.4 in central
	found com.sun.jersey#jersey-core;1.9 in central
	found org.apache.mesos#mesos;0.21.1 in central
	found com.clearspring.analytics#stream;2.7.0 in central
	found io.dropwizard.metrics#metrics-core;3.1.2 in central
	found io.dropwizard.metrics#metrics-jvm;3.1.2 in central
	found io.dropwizard.metrics#metrics-json;3.1.2 in central
	found io.dropwizard.metrics#metrics-graphite;3.1.2 in central
	found com.fasterxml.jackson.module#jackson-module-scala_2.10;2.4.4 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.4.4 in central
	found org.apache.ivy#ivy;2.4.0 in central
	found oro#oro;2.0.8 in central
	found org.tachyonproject#tachyon-client;0.7.1 in central
	found commons-io#commons-io;2.4 in central
	found org.tachyonproject#tachyon-underfs-hdfs;0.7.1 in central
	found org.tachyonproject#tachyon-underfs-local;0.7.1 in central
	found net.razorvine#pyrolite;4.4 in central
	found net.sf.py4j#py4j;0.8.2.1 in central
	found org.apache.hadoop#hadoop-yarn-server-nodemanager;2.2.0 in central
	found org.jboss.netty#netty;3.2.2.Final in central
	found org.apache.spark#spark-sql_2.10;1.5.0 in central
	found org.apache.spark#spark-catalyst_2.10;1.5.0 in central
	found org.codehaus.janino#janino;2.7.8 in central
	found org.codehaus.janino#commons-compiler;2.7.8 in central
	found org.apache.parquet#parquet-column;1.7.0 in central
	found org.apache.parquet#parquet-common;1.7.0 in central
	found org.apache.parquet#parquet-encoding;1.7.0 in central
	found org.apache.parquet#parquet-generator;1.7.0 in central
	found org.apache.parquet#parquet-hadoop;1.7.0 in central
	found org.apache.parquet#parquet-format;2.3.0-incubating in central
	found org.apache.parquet#parquet-jackson;1.7.0 in central
	found org.slf4j#slf4j-api;1.7.5 in central
downloading https://repo1.maven.org/maven2/com/databricks/spark-csv_2.10/1.3.0/spark-csv_2.10-1.3.0.jar ...
	[SUCCESSFUL ] com.databricks#spark-csv_2.10;1.3.0!spark-csv_2.10.jar (55355ms)
downloading https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar ...
	[SUCCESSFUL ] org.apache.commons#commons-csv;1.1!commons-csv.jar (8339ms)
downloading https://repo1.maven.org/maven2/com/univocity/univocity-parsers/1.5.1/univocity-parsers-1.5.1.jar ...
	[SUCCESSFUL ] com.univocity#univocity-parsers;1.5.1!univocity-parsers.jar (27398ms)
downloading https://repo1.maven.org/maven2/org/scoverage/scalac-scoverage-runtime_2.10/1.1.0/scalac-scoverage-runtime_2.10-1.1.0.jar ...
	[SUCCESSFUL ] org.scoverage#scalac-scoverage-runtime_2.10;1.1.0!scalac-scoverage-runtime_2.10.jar (661ms)
downloading https://repo1.maven.org/maven2/org/scoverage/scalac-scoverage-plugin_2.10/1.1.0/scalac-scoverage-plugin_2.10-1.1.0.jar ...
	[SUCCESSFUL ] org.scoverage#scalac-scoverage-plugin_2.10;1.1.0!scalac-scoverage-plugin_2.10.jar (20807ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.5.0/spark-core_2.10-1.5.0.jar ...
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.0/spark-sql_2.10-1.5.0.jar ...
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.5/slf4j-api-1.7.5.jar ...

In [ ]:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val bankScala = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter",";").option("inferSchema", "true").load("/vagrant/zeppelin_notebooks/bank.csv")

In [ ]:
bankScala.show()

In [ ]:
bankScala.printSchema()

In [ ]:
bankScala.describe().show

In [ ]:


In [ ]: