Getting started

You're going to want to install all the requirements to get going. I've included a requirements.txt file you can install with pip install -r requirements.txt.

Required imports

You're probably going to want the SQLContext since it gives you SQL and dataframes. Dataframes are much more performant since they don't send data over the JavaGateway


In [4]:
from pyspark_cassandra import CassandraSparkContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext # needed for toDF()

In [4]:
users = sc.cassandraTable("demo", "user").toDF()
food_count = users.select("favorite_food").groupBy("favorite_food").count()
food_count.collect()


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-59ebad55f1e0> in <module>()
----> 1 users = sc.cassandraTable("demo", "user").toDF()
      2 food_count = users.select("favorite_food").groupBy("favorite_food").count()
      3 food_count.collect()

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/sql/context.pyc in toDF(self, schema, sampleRatio)
     52         [Row(name=u'Alice', age=1)]
     53         """
---> 54         return sqlContext.createDataFrame(self, schema, sampleRatio)
     55 
     56     RDD.toDF = toDF

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    282 
    283         if schema is None:
--> 284             schema = self._inferSchema(rdd, samplingRatio)
    285             converter = _create_converter(schema)
    286             rdd = rdd.map(converter)

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/sql/context.pyc in _inferSchema(self, rdd, samplingRatio)
    162 
    163     def _inferSchema(self, rdd, samplingRatio=None):
--> 164         first = rdd.first()
    165         if not first:
    166             raise ValueError("The first row in RDD is empty, "

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/rdd.pyc in first(self)
   1240         ValueError: RDD is empty
   1241         """
-> 1242         rs = self.take(1)
   1243         if rs:
   1244             return rs[0]

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/rdd.pyc in take(self, num)
   1192         """
   1193         items = []
-> 1194         totalParts = self._jrdd.partitions().size()
   1195         partsScanned = 0
   1196 

/usr/local/spark-1.3.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/spark-1.3.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o32.partitions.
: java.io.IOException: Couldn't find demo.user or any similarly named keyspace and table pairs
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:57)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:58)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:58)
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:161)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:58)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:117)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)

In [3]:
# RDD counting example
u = sc.cassandraTable("demo", "user")
u.map(lambda x: (x['favorite_food'], 1)).\
      reduceByKey(lambda x, y: x + y).collect()


Out[3]:
[(u'bacon', 1), (u'pie', 1), (u'pizza', 2)]

In [4]:
users = sc.cassandraTable("demo", "user").toDF()

food_count = users.select("favorite_food").\
                   groupBy("favorite_food").count()

food_count.collect()


Out[4]:
[Row(favorite_food=u'bacon', count=1),
 Row(favorite_food=u'pizza', count=2),
 Row(favorite_food=u'pie', count=1)]

In [5]:
sql = SQLContext(sc)
users = sc.cassandraTable("demo", "user").toDF()
users.registerTempTable("users")
sql.sql("""select favorite_food, count(favorite_food) 
            from users group by favorite_food """).collect()


Out[5]:
[Row(favorite_food=u'bacon', c1=1),
 Row(favorite_food=u'pizza', c1=2),
 Row(favorite_food=u'pie', c1=1)]

In [6]:
result = sql.sql("""select favorite_food, count(favorite_food) 
            from users group by favorite_food """).toPandas()

In [22]:
%matplotlib inline


---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-22-2b1da000a957> in <module>()
----> 1 get_ipython().magic(u'matplotlib inline')

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2305         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2306         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2307         return self.run_line_magic(magic_name, magic_arg_s)
   2308 
   2309     #-------------------------------------------------------------------------

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2226                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2227             with self.builtin_trap:
-> 2228                 result = fn(*args,**kwargs)
   2229             return result
   2230 

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/magics/pylab.pyc in matplotlib(self, line)

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    191     # but it's overkill for just that one bit of state.
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/magics/pylab.pyc in matplotlib(self, line)
     86         """
     87         args = magic_arguments.parse_argstring(self.matplotlib, line)
---> 88         gui, backend = self.shell.enable_matplotlib(args.gui)
     89         self._show_matplotlib_backend(args.gui, backend)
     90 

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in enable_matplotlib(self, gui)
   3087         """
   3088         from IPython.core import pylabtools as pt
-> 3089         gui, backend = pt.find_gui_and_backend(gui, self.pylab_gui_select)
   3090 
   3091         if gui != 'inline':

/Users/jhaddad/.virtualenvs/pyspark2/lib/python2.7/site-packages/IPython/core/pylabtools.pyc in find_gui_and_backend(gui, gui_select)
    237     """
    238 
--> 239     import matplotlib
    240 
    241     if gui and gui != 'auto':

ImportError: No module named matplotlib

In [14]:
result.set_index("favorite_food")


Out[14]:
c1
favorite_food
bacon 1
pizza 2
pie 1

In [23]:
result.set_index("favorite_food").plot(kind="pie",y="c1", legend=False)


Out[23]:
<matplotlib.axes._subplots.AxesSubplot at 0x10bde9150>

How to create JVM maps


In [20]:
sc._gateway.jvm.java.util.HashMap()


Out[20]:
{}

In [5]:
sql = SQLContext(sc)
options = sc._gateway.jvm.java.util.HashMap()
options["keyspace"] = "labor"
options["c_table"] = "average_price_data"
sql.load(source="org.apache.spark.sql.cassandra", 
         options=options)


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-d8c77945d7f5> in <module>()
      4 options["c_table"] = "average_price_data"
      5 sql.load(source="org.apache.spark.sql.cassandra", 
----> 6          options=options)

/usr/local/spark-1.3.1-bin-cdh4/python/pyspark/sql/context.pyc in load(self, path, source, schema, **options)
    440                                           self._sc._gateway._gateway_client)
    441         if schema is None:
--> 442             df = self._ssql_ctx.load(source, joptions)
    443         else:
    444             if not isinstance(schema, StructType):

/usr/local/spark-1.3.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/spark-1.3.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.load.
: java.lang.RuntimeException: Failed to load class for data source: org.apache.spark.sql.cassandra
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)

In [14]:
sc._gateway


Out[14]:
<py4j.java_gateway.JavaGateway at 0x107d44450>

In [15]:
gw = sc._gateway

In [ ]: