DataFrames For DataScientists

  1. SparkContext()
  2. Read/Write
  3. Convert
  4. Columns & Rows
  5. DataFrame : RDD-like Operations
  6. DataFrame : Action
  7. DataFrame : Scientific Functions
  8. DataFrame : Statistical Functions
  9. DataFrame : Aggregate Functions
  10. DataFrame : na
  11. DataFrame : Joins, Set Operations
  12. DataFrame : Tables & SQL

1. SparkContext()


In [1]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))


Last run @2015-12-17 21:23:50.063441-08:00

In [2]:
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)


Running Spark Version 1.6.0

In [3]:
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()


spark.app.name=PySparkShell
spark.files=file:/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,file:/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,file:/Users/ksankar/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar
spark.jars=file:/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,file:/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,file:/Users/ksankar/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar
spark.master=local[*]
spark.submit.deployMode=client
spark.submit.pyFiles=/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,/Users/ksankar/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar

In [4]:
sqlCxt = pyspark.sql.SQLContext(sc)

2. Read/Write


In [5]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('spark-csv/cars.csv')
df.coalesce(1).select('year', 'model').write.format('com.databricks.spark.csv').save('newcars.csv')


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-ddb83e559a4e> in <module>()
      3 
      4 df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('spark-csv/cars.csv')
----> 5 df.coalesce(1).select('year', 'model').write.format('com.databricks.spark.csv').save('newcars.csv')

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    385             self._jwrite.save()
    386         else:
--> 387             self._jwrite.save(path)
    388 
    389     @since(1.4)

/Users/ksankar/Downloads/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/Users/ksankar/Downloads/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o33.save.
: java.lang.RuntimeException: path newcars.csv already exists.
	at scala.sys.package$.error(package.scala:27)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:176)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:208)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)

In [6]:
df.show()


+----+-----+-----+--------------------+-----+
|year| make|model|             comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla|    S|          No comment|     |
|1997| Ford| E350|Go get one now th...|     |
|2015|Chevy| Volt|                null| null|
+----+-----+-----+--------------------+-----+


In [7]:
df_cars = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('car-data/car-milage.csv')

In [8]:
df_cars_x = sqlContext.read.load('cars_1.parquet')
df_cars_x.dtypes


Out[8]:
[('mpg', 'double'),
 ('displacement', 'double'),
 ('hp', 'int'),
 ('torque', 'int'),
 ('CRatio', 'float'),
 ('RARatio', 'float'),
 ('CarbBarrells', 'int'),
 ('NoOfSpeed', 'int'),
 ('length', 'float'),
 ('width', 'float'),
 ('weight', 'int'),
 ('automatic', 'int')]

In [9]:
df_cars.show(40)


+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
|  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
| 18.9|         350|165|   260|     8|   2.56|           4|        3| 200.3| 69.9|  3910|        1|
|   17|         350|170|   275|   8.5|   2.56|           4|        3| 199.6| 72.9|  3860|        1|
|   20|         250|105|   185|  8.25|   2.73|           1|        3| 196.7| 72.2|  3510|        1|
|18.25|         351|143|   255|     8|      3|           2|        3| 199.9|   74|  3890|        1|
|20.07|         225| 95|   170|   8.4|   2.76|           1|        3| 194.1| 71.8|  3365|        0|
| 11.2|         440|215|   330|   8.2|   2.88|           4|        3| 184.5|   69|  4215|        1|
|22.12|         231|110|   175|     8|   2.56|           2|        3| 179.3| 65.4|  3020|        1|
|21.47|         262|110|   200|   8.5|   2.56|           2|        3| 179.3| 65.4|  3180|        1|
| 34.7|        89.7| 70|    81|   8.2|    3.9|           2|        4| 155.7|   64|  1905|        0|
| 30.4|        96.9| 75|    83|     9|    4.3|           2|        5| 165.2|   65|  2320|        0|
| 16.5|         350|155|   250|   8.5|   3.08|           4|        3| 195.4| 74.4|  3885|        1|
| 36.5|        85.3| 80|    83|   8.5|   3.89|           2|        4| 160.6| 62.2|  2009|        0|
| 21.5|         171|109|   146|   8.2|   3.22|           2|        4| 170.4| 66.9|  2655|        0|
| 19.7|         258|110|   195|     8|   3.08|           1|        3| 171.5|   77|  3375|        1|
| 20.3|         140| 83|   109|   8.4|    3.4|           2|        4| 168.8| 69.4|  2700|        0|
| 17.8|         302|129|   220|     8|      3|           2|        3| 199.9|   74|  3890|        1|
|14.39|         500|190|   360|   8.5|   2.73|           4|        3| 224.1| 79.8|  5290|        1|
|14.89|         440|215|   330|   8.2|   2.71|           4|        3|   231| 79.7|  5185|        1|
| 17.8|         350|155|   250|   8.5|   3.08|           4|        3| 196.7| 72.2|  3910|        1|
|16.41|         318|145|   255|   8.5|   2.45|           2|        3| 197.6|   71|  3660|        1|
|23.54|         231|110|   175|     8|   2.56|           2|        3| 179.3| 65.4|  3050|        1|
|21.47|         360|180|   290|   8.4|   2.45|           2|        3| 214.2| 76.3|  4250|        1|
|16.59|         400|185|      |   7.6|   3.08|           4|        3|   196|   73|  3850|        1|
| 31.9|        96.9| 75|    83|     9|    4.3|           2|        5| 165.2| 61.8|  2275|        0|
| 29.4|         140| 86|      |     8|   2.92|           2|        4| 176.4| 65.4|  2150|        0|
|13.27|         460|223|   366|     8|      3|           4|        3|   228| 79.8|  5430|        1|
| 23.9|       133.6| 96|   120|   8.4|   3.91|           2|        5| 171.5| 63.4|  2535|        0|
|19.73|         318|140|   255|   8.5|   2.71|           2|        3| 215.3| 76.3|  4370|        1|
| 13.9|         351|148|   243|     8|   3.25|           2|        3| 215.5| 78.5|  4540|        1|
|13.27|         351|148|   243|     8|   3.26|           2|        3| 216.1| 78.5|  4715|        1|
|13.77|         360|195|   295|  8.25|   3.15|           4|        3| 209.3| 77.4|  4215|        1|
| 16.5|         360|165|   255|   8.5|   2.73|           4|        3| 185.2|   69|  3660|        1|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+


In [10]:
df_cars.describe().show()


+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+


In [11]:
df_cars.describe(["mpg",'hp']).show()


+-------+-----------------+------------------+
|summary|              mpg|                hp|
+-------+-----------------+------------------+
|  count|               32|                32|
|   mean|        20.223125|           136.875|
| stddev|6.318289089312789|44.980820285410395|
|    min|             11.2|               105|
|    max|             36.5|                96|
+-------+-----------------+------------------+


In [12]:
df_cars.groupby("automatic").avg("mpg")


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-12-a0e8ae310b4c> in <module>()
----> 1 df_cars.groupby("automatic").avg("mpg")

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/group.pyc in _api(self, *args)
     38     def _api(self, *args):
     39         name = f.__name__
---> 40         jdf = getattr(self._jdf, name)(_to_seq(self.sql_ctx._sc, args))
     41         return DataFrame(jdf, self.sql_ctx)
     42     _api.__name__ = f.__name__

/Users/ksankar/Downloads/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     49                                              e.java_exception.getStackTrace()))
     50             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 51                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     52             if s.startswith('java.lang.IllegalArgumentException: '):
     53                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'"mpg" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

In [13]:
df_cars.na.drop('any').count()


Out[13]:
32

In [14]:
df_cars.count()


Out[14]:
32

In [15]:
df_cars.dtypes


Out[15]:
[('mpg', 'string'),
 ('displacement', 'string'),
 ('hp', 'string'),
 ('torque', 'string'),
 ('CRatio', 'string'),
 ('RARatio', 'string'),
 ('CarbBarrells', 'string'),
 ('NoOfSpeed', 'string'),
 ('length', 'string'),
 ('width', 'string'),
 ('weight', 'string'),
 ('automatic', 'string')]

In [16]:
df_2 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),df_cars.torque.cast("double").alias('torque'),
                     df_cars.automatic.cast("integer").alias('automatic'))

In [17]:
df_2.show(40)


+-----+------+---------+
|  mpg|torque|automatic|
+-----+------+---------+
| 18.9| 260.0|        1|
| 17.0| 275.0|        1|
| 20.0| 185.0|        1|
|18.25| 255.0|        1|
|20.07| 170.0|        0|
| 11.2| 330.0|        1|
|22.12| 175.0|        1|
|21.47| 200.0|        1|
| 34.7|  81.0|        0|
| 30.4|  83.0|        0|
| 16.5| 250.0|        1|
| 36.5|  83.0|        0|
| 21.5| 146.0|        0|
| 19.7| 195.0|        1|
| 20.3| 109.0|        0|
| 17.8| 220.0|        1|
|14.39| 360.0|        1|
|14.89| 330.0|        1|
| 17.8| 250.0|        1|
|16.41| 255.0|        1|
|23.54| 175.0|        1|
|21.47| 290.0|        1|
|16.59|  null|        1|
| 31.9|  83.0|        0|
| 29.4|  null|        0|
|13.27| 366.0|        1|
| 23.9| 120.0|        0|
|19.73| 255.0|        1|
| 13.9| 243.0|        1|
|13.27| 243.0|        1|
|13.77| 295.0|        1|
| 16.5| 255.0|        1|
+-----+------+---------+


In [18]:
df_2.dtypes


Out[18]:
[('mpg', 'double'), ('torque', 'double'), ('automatic', 'int')]

In [19]:
df_2.describe().show()


+-------+-----------------+-----------------+-------------------+
|summary|              mpg|           torque|          automatic|
+-------+-----------------+-----------------+-------------------+
|  count|               32|               30|                 32|
|   mean|        20.223125|            217.9|            0.71875|
| stddev|6.318289089312789|83.06970483918289|0.45680340939917435|
|    min|             11.2|             81.0|                  0|
|    max|             36.5|            366.0|                  1|
+-------+-----------------+-----------------+-------------------+

9. DataFrame : Aggregate Functions


In [20]:
df_2.groupby("automatic").avg("mpg","torque").show()


+---------+------------------+-----------------+
|automatic|          avg(mpg)|      avg(torque)|
+---------+------------------+-----------------+
|        0|27.630000000000003|          109.375|
|        1|17.324782608695653|257.3636363636364|
+---------+------------------+-----------------+


In [21]:
df_2.groupBy().avg("mpg","torque").show()


+---------+-----------+
| avg(mpg)|avg(torque)|
+---------+-----------+
|20.223125|      217.9|
+---------+-----------+


In [22]:
df_2.agg({"*":"count"}).show()


+--------+
|count(1)|
+--------+
|      32|
+--------+


In [23]:
import pyspark.sql.functions as F
df_2.agg(F.min(df_2.mpg)).show()


+--------+
|min(mpg)|
+--------+
|    11.2|
+--------+


In [24]:
import pyspark.sql.functions as F
df_2.agg(F.mean(df_2.mpg)).show()


+---------+
| avg(mpg)|
+---------+
|20.223125|
+---------+


In [25]:
gdf_2 = df_2.groupBy("automatic")
gdf_2.agg({'mpg':'min'}).collect()
gdf_2.agg({'mpg':'min'}).show()


+---------+--------+
|automatic|min(mpg)|
+---------+--------+
|        0|   20.07|
|        1|    11.2|
+---------+--------+


In [26]:
df_cars_1 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),
                           df_cars.displacement.cast("double").alias('displacement'),
                           df_cars.hp.cast("integer").alias('hp'),
                           df_cars.torque.cast("integer").alias('torque'),
                           df_cars.CRatio.cast("float").alias('CRatio'),
                           df_cars.RARatio.cast("float").alias('RARatio'),
                           df_cars.CarbBarrells.cast("integer").alias('CarbBarrells'),
                           df_cars.NoOfSpeed.cast("integer").alias('NoOfSpeed'),
                           df_cars.length.cast("float").alias('length'),
                           df_cars.width.cast("float").alias('width'),
                           df_cars.weight.cast("integer").alias('weight'),
                           df_cars.automatic.cast("integer").alias('automatic'))

In [27]:
gdf_3 = df_cars_1.groupBy("automatic")
gdf_3.agg({'mpg':'mean'}).show()


+---------+------------------+
|automatic|          avg(mpg)|
+---------+------------------+
|        0|27.630000000000003|
|        1|17.324782608695653|
+---------+------------------+


In [28]:
df_cars_1.avg("mpg","torque").show()


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-28-2407b3220340> in <module>()
----> 1 df_cars_1.avg("mpg","torque").show()

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
    837         if name not in self.columns:
    838             raise AttributeError(
--> 839                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
    840         jc = self._jdf.apply(name)
    841         return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'avg'

In [29]:
df_cars_1.groupBy().avg("mpg","torque").show()


+---------+-----------+
| avg(mpg)|avg(torque)|
+---------+-----------+
|20.223125|      217.9|
+---------+-----------+


In [30]:
df_cars_1.groupby("automatic").avg("mpg","torque").show()


+---------+------------------+-----------------+
|automatic|          avg(mpg)|      avg(torque)|
+---------+------------------+-----------------+
|        0|27.630000000000003|          109.375|
|        1|17.324782608695653|257.3636363636364|
+---------+------------------+-----------------+


In [31]:
df_cars_1.groupby("automatic").avg("mpg","torque","hp","weight").show()


+---------+------------------+-----------------+-----------------+------------------+
|automatic|          avg(mpg)|      avg(torque)|          avg(hp)|       avg(weight)|
+---------+------------------+-----------------+-----------------+------------------+
|        0|27.630000000000003|          109.375|85.44444444444444|2434.8888888888887|
|        1|17.324782608695653|257.3636363636364|            157.0| 4037.391304347826|
+---------+------------------+-----------------+-----------------+------------------+


In [32]:
df_cars_1.printSchema()


root
 |-- mpg: double (nullable = true)
 |-- displacement: double (nullable = true)
 |-- hp: integer (nullable = true)
 |-- torque: integer (nullable = true)
 |-- CRatio: float (nullable = true)
 |-- RARatio: float (nullable = true)
 |-- CarbBarrells: integer (nullable = true)
 |-- NoOfSpeed: integer (nullable = true)
 |-- length: float (nullable = true)
 |-- width: float (nullable = true)
 |-- weight: integer (nullable = true)
 |-- automatic: integer (nullable = true)


In [33]:
df_cars_1.show(5)


+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
|  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
| 18.9|       350.0|165|   260|   8.0|   2.56|           4|        3| 200.3| 69.9|  3910|        1|
| 17.0|       350.0|170|   275|   8.5|   2.56|           4|        3| 199.6| 72.9|  3860|        1|
| 20.0|       250.0|105|   185|  8.25|   2.73|           1|        3| 196.7| 72.2|  3510|        1|
|18.25|       351.0|143|   255|   8.0|    3.0|           2|        3| 199.9| 74.0|  3890|        1|
|20.07|       225.0| 95|   170|   8.4|   2.76|           1|        3| 194.1| 71.8|  3365|        0|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
only showing top 5 rows


In [34]:
df_cars_1.describe().show()


+-------+-----------------+------------------+------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-------------------+
|summary|              mpg|      displacement|                hp|           torque|             CRatio|           RARatio|     CarbBarrells|         NoOfSpeed|            length|            width|           weight|          automatic|
+-------+-----------------+------------------+------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-------------------+
|  count|               32|                32|                32|               30|                 32|                32|               32|                32|                32|               32|               32|                 32|
|   mean|        20.223125|         285.04375|           136.875|            217.9|  8.281249925494194|3.0553125217556953|          2.59375|           3.34375|191.95625019073486|71.28125071525574|        3586.6875|            0.71875|
| stddev|6.318289089312789|117.24021367096235|44.980820285410395|83.06970483918289|0.30073833292040675|0.5123380437957931|1.073414058942533|0.6530017536902797| 20.54857328178095|5.603077410154348|947.9431872693228|0.45680340939917435|
|    min|             11.2|              85.3|                70|               81|                7.6|              2.45|                1|                 3|             155.7|             61.8|             1905|                  0|
|    max|             36.5|             500.0|               223|              366|                9.0|               4.3|                4|                 5|             231.0|             79.8|             5430|                  1|
+-------+-----------------+------------------+------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-------------------+


In [35]:
df_cars_1.groupBy().agg({"mpg":"mean"}).show()


+---------+
| avg(mpg)|
+---------+
|20.223125|
+---------+


In [36]:
df_cars_1.show(40)


+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
|  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
| 18.9|       350.0|165|   260|   8.0|   2.56|           4|        3| 200.3| 69.9|  3910|        1|
| 17.0|       350.0|170|   275|   8.5|   2.56|           4|        3| 199.6| 72.9|  3860|        1|
| 20.0|       250.0|105|   185|  8.25|   2.73|           1|        3| 196.7| 72.2|  3510|        1|
|18.25|       351.0|143|   255|   8.0|    3.0|           2|        3| 199.9| 74.0|  3890|        1|
|20.07|       225.0| 95|   170|   8.4|   2.76|           1|        3| 194.1| 71.8|  3365|        0|
| 11.2|       440.0|215|   330|   8.2|   2.88|           4|        3| 184.5| 69.0|  4215|        1|
|22.12|       231.0|110|   175|   8.0|   2.56|           2|        3| 179.3| 65.4|  3020|        1|
|21.47|       262.0|110|   200|   8.5|   2.56|           2|        3| 179.3| 65.4|  3180|        1|
| 34.7|        89.7| 70|    81|   8.2|    3.9|           2|        4| 155.7| 64.0|  1905|        0|
| 30.4|        96.9| 75|    83|   9.0|    4.3|           2|        5| 165.2| 65.0|  2320|        0|
| 16.5|       350.0|155|   250|   8.5|   3.08|           4|        3| 195.4| 74.4|  3885|        1|
| 36.5|        85.3| 80|    83|   8.5|   3.89|           2|        4| 160.6| 62.2|  2009|        0|
| 21.5|       171.0|109|   146|   8.2|   3.22|           2|        4| 170.4| 66.9|  2655|        0|
| 19.7|       258.0|110|   195|   8.0|   3.08|           1|        3| 171.5| 77.0|  3375|        1|
| 20.3|       140.0| 83|   109|   8.4|    3.4|           2|        4| 168.8| 69.4|  2700|        0|
| 17.8|       302.0|129|   220|   8.0|    3.0|           2|        3| 199.9| 74.0|  3890|        1|
|14.39|       500.0|190|   360|   8.5|   2.73|           4|        3| 224.1| 79.8|  5290|        1|
|14.89|       440.0|215|   330|   8.2|   2.71|           4|        3| 231.0| 79.7|  5185|        1|
| 17.8|       350.0|155|   250|   8.5|   3.08|           4|        3| 196.7| 72.2|  3910|        1|
|16.41|       318.0|145|   255|   8.5|   2.45|           2|        3| 197.6| 71.0|  3660|        1|
|23.54|       231.0|110|   175|   8.0|   2.56|           2|        3| 179.3| 65.4|  3050|        1|
|21.47|       360.0|180|   290|   8.4|   2.45|           2|        3| 214.2| 76.3|  4250|        1|
|16.59|       400.0|185|  null|   7.6|   3.08|           4|        3| 196.0| 73.0|  3850|        1|
| 31.9|        96.9| 75|    83|   9.0|    4.3|           2|        5| 165.2| 61.8|  2275|        0|
| 29.4|       140.0| 86|  null|   8.0|   2.92|           2|        4| 176.4| 65.4|  2150|        0|
|13.27|       460.0|223|   366|   8.0|    3.0|           4|        3| 228.0| 79.8|  5430|        1|
| 23.9|       133.6| 96|   120|   8.4|   3.91|           2|        5| 171.5| 63.4|  2535|        0|
|19.73|       318.0|140|   255|   8.5|   2.71|           2|        3| 215.3| 76.3|  4370|        1|
| 13.9|       351.0|148|   243|   8.0|   3.25|           2|        3| 215.5| 78.5|  4540|        1|
|13.27|       351.0|148|   243|   8.0|   3.26|           2|        3| 216.1| 78.5|  4715|        1|
|13.77|       360.0|195|   295|  8.25|   3.15|           4|        3| 209.3| 77.4|  4215|        1|
| 16.5|       360.0|165|   255|   8.5|   2.73|           4|        3| 185.2| 69.0|  3660|        1|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+

8. DataFrame : Statistical Functions


In [37]:
df_cars_1.corr('hp','weight')


Out[37]:
0.8834003785623672

In [38]:
df_cars_1.corr('RARatio','width')


Out[38]:
-0.43435769959911846

In [39]:
df_cars_1.crosstab('automatic','NoOfSpeed').show()


+-------------------+---+---+---+
|automatic_NoOfSpeed|  3|  4|  5|
+-------------------+---+---+---+
|                  1| 23|  0|  0|
|                  0|  1|  5|  3|
+-------------------+---+---+---+


In [40]:
df_cars_1.crosstab('NoOfSpeed','CarbBarrells').show()


+----------------------+---+---+---+
|NoOfSpeed_CarbBarrells|  2|  1|  4|
+----------------------+---+---+---+
|                     5|  3|  0|  0|
|                     4|  5|  0|  0|
|                     3| 10|  3| 11|
+----------------------+---+---+---+


In [41]:
df_cars_1.crosstab('automatic','CarbBarrells').show()


+----------------------+---+---+---+
|automatic_CarbBarrells|  1|  2|  4|
+----------------------+---+---+---+
|                     1|  2| 10| 11|
|                     0|  1|  8|  0|
+----------------------+---+---+---+

10. DataFrame : na


In [42]:
# We can see if a column has null values
df_cars_1.select(df_cars_1.torque.isNull()).show()


+--------------+
|isnull(torque)|
+--------------+
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
|         false|
+--------------+
only showing top 20 rows


In [43]:
# We can filter null and non null rows
df_cars_1.filter(df_cars_1.torque.isNull()).show(40) # You can also use isNotNull


+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
|  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+
|16.59|       400.0|185|  null|   7.6|   3.08|           4|        3| 196.0| 73.0|  3850|        1|
| 29.4|       140.0| 86|  null|   8.0|   2.92|           2|        4| 176.4| 65.4|  2150|        0|
+-----+------------+---+------+------+-------+------------+---------+------+-----+------+---------+


In [44]:
df_cars_1.na.drop().count()


Out[44]:
30

In [45]:
df_cars_1.fillna(9999).show(50)
# This is not what we will do normally. Just to show the effect of fillna
# you can use df_cars_1.na.fill(9999)


+-----+------------+---+------+-----------------+------------------+------------+---------+------------------+------------------+------+---------+
|  mpg|displacement| hp|torque|           CRatio|           RARatio|CarbBarrells|NoOfSpeed|            length|             width|weight|automatic|
+-----+------------+---+------+-----------------+------------------+------------+---------+------------------+------------------+------+---------+
| 18.9|       350.0|165|   260|              8.0| 2.559999942779541|           4|        3| 200.3000030517578|  69.9000015258789|  3910|        1|
| 17.0|       350.0|170|   275|              8.5| 2.559999942779541|           4|        3|199.60000610351562|  72.9000015258789|  3860|        1|
| 20.0|       250.0|105|   185|             8.25|2.7300000190734863|           1|        3| 196.6999969482422| 72.19999694824219|  3510|        1|
|18.25|       351.0|143|   255|              8.0|               3.0|           2|        3|199.89999389648438|              74.0|  3890|        1|
|20.07|       225.0| 95|   170|8.399999618530273| 2.759999990463257|           1|        3|194.10000610351562| 71.80000305175781|  3365|        0|
| 11.2|       440.0|215|   330|8.199999809265137| 2.880000114440918|           4|        3|             184.5|              69.0|  4215|        1|
|22.12|       231.0|110|   175|              8.0| 2.559999942779541|           2|        3| 179.3000030517578|  65.4000015258789|  3020|        1|
|21.47|       262.0|110|   200|              8.5| 2.559999942779541|           2|        3| 179.3000030517578|  65.4000015258789|  3180|        1|
| 34.7|        89.7| 70|    81|8.199999809265137|3.9000000953674316|           2|        4| 155.6999969482422|              64.0|  1905|        0|
| 30.4|        96.9| 75|    83|              9.0| 4.300000190734863|           2|        5| 165.1999969482422|              65.0|  2320|        0|
| 16.5|       350.0|155|   250|              8.5|3.0799999237060547|           4|        3|195.39999389648438|  74.4000015258789|  3885|        1|
| 36.5|        85.3| 80|    83|              8.5| 3.890000104904175|           2|        4|160.60000610351562| 62.20000076293945|  2009|        0|
| 21.5|       171.0|109|   146|8.199999809265137|3.2200000286102295|           2|        4|170.39999389648438|  66.9000015258789|  2655|        0|
| 19.7|       258.0|110|   195|              8.0|3.0799999237060547|           1|        3|             171.5|              77.0|  3375|        1|
| 20.3|       140.0| 83|   109|8.399999618530273|3.4000000953674316|           2|        4| 168.8000030517578|  69.4000015258789|  2700|        0|
| 17.8|       302.0|129|   220|              8.0|               3.0|           2|        3|199.89999389648438|              74.0|  3890|        1|
|14.39|       500.0|190|   360|              8.5|2.7300000190734863|           4|        3|224.10000610351562| 79.80000305175781|  5290|        1|
|14.89|       440.0|215|   330|8.199999809265137|2.7100000381469727|           4|        3|             231.0| 79.69999694824219|  5185|        1|
| 17.8|       350.0|155|   250|              8.5|3.0799999237060547|           4|        3| 196.6999969482422| 72.19999694824219|  3910|        1|
|16.41|       318.0|145|   255|              8.5| 2.450000047683716|           2|        3|197.60000610351562|              71.0|  3660|        1|
|23.54|       231.0|110|   175|              8.0| 2.559999942779541|           2|        3| 179.3000030517578|  65.4000015258789|  3050|        1|
|21.47|       360.0|180|   290|8.399999618530273| 2.450000047683716|           2|        3| 214.1999969482422| 76.30000305175781|  4250|        1|
|16.59|       400.0|185|  9999|7.599999904632568|3.0799999237060547|           4|        3|             196.0|              73.0|  3850|        1|
| 31.9|        96.9| 75|    83|              9.0| 4.300000190734863|           2|        5| 165.1999969482422| 61.79999923706055|  2275|        0|
| 29.4|       140.0| 86|  9999|              8.0|2.9200000762939453|           2|        4|176.39999389648438|  65.4000015258789|  2150|        0|
|13.27|       460.0|223|   366|              8.0|               3.0|           4|        3|             228.0| 79.80000305175781|  5430|        1|
| 23.9|       133.6| 96|   120|8.399999618530273|3.9100000858306885|           2|        5|             171.5|63.400001525878906|  2535|        0|
|19.73|       318.0|140|   255|              8.5|2.7100000381469727|           2|        3| 215.3000030517578| 76.30000305175781|  4370|        1|
| 13.9|       351.0|148|   243|              8.0|              3.25|           2|        3|             215.5|              78.5|  4540|        1|
|13.27|       351.0|148|   243|              8.0| 3.259999990463257|           2|        3|216.10000610351562|              78.5|  4715|        1|
|13.77|       360.0|195|   295|             8.25|3.1500000953674316|           4|        3| 209.3000030517578|  77.4000015258789|  4215|        1|
| 16.5|       360.0|165|   255|              8.5|2.7300000190734863|           4|        3| 185.1999969482422|              69.0|  3660|        1|
+-----+------------+---+------+-----------------+------------------+------------+---------+------------------+------------------+------+---------+


In [46]:
# Let us try the interesting when syntax on the HP column
# 0-100=1,101-200=2,201-300=3,others=4
df_cars_1.select(df_cars_1.hp, F.when(df_cars_1.hp <= 100, 1).when(df_cars_1.hp <= 200, 2)
                 .when(df_cars_1.hp <= 300, 3).otherwise(4).alias("hpCode")).show(40)


+---+------+
| hp|hpCode|
+---+------+
|165|     2|
|170|     2|
|105|     2|
|143|     2|
| 95|     1|
|215|     3|
|110|     2|
|110|     2|
| 70|     1|
| 75|     1|
|155|     2|
| 80|     1|
|109|     2|
|110|     2|
| 83|     1|
|129|     2|
|190|     2|
|215|     3|
|155|     2|
|145|     2|
|110|     2|
|180|     2|
|185|     2|
| 75|     1|
| 86|     1|
|223|     3|
| 96|     1|
|140|     2|
|148|     2|
|148|     2|
|195|     2|
|165|     2|
+---+------+


In [47]:
df_cars_1.dtypes


Out[47]:
[('mpg', 'double'),
 ('displacement', 'double'),
 ('hp', 'int'),
 ('torque', 'int'),
 ('CRatio', 'float'),
 ('RARatio', 'float'),
 ('CarbBarrells', 'int'),
 ('NoOfSpeed', 'int'),
 ('length', 'float'),
 ('width', 'float'),
 ('weight', 'int'),
 ('automatic', 'int')]

In [48]:
df_cars_1.groupBy('CarbBarrells').count().show()


+------------+-----+
|CarbBarrells|count|
+------------+-----+
|           1|    3|
|           2|   18|
|           4|   11|
+------------+-----+


In [49]:
# If file exists, will give error
# java.lang.RuntimeException: path file:.. /cars_1.parquet already exists.
#
df_cars_1.repartition(1).write.save("cars_1.parquet", format="parquet")


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-49-cc10f3e861cd> in <module>()
      2 # java.lang.RuntimeException: path file:.. /cars_1.parquet already exists.
      3 #
----> 4 df_cars_1.repartition(1).write.save("cars_1.parquet", format="parquet")

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    385             self._jwrite.save()
    386         else:
--> 387             self._jwrite.save(path)
    388 
    389     @since(1.4)

/Users/ksankar/Downloads/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     49                                              e.java_exception.getStackTrace()))
     50             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 51                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     52             if s.startswith('java.lang.IllegalArgumentException: '):
     53                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'path file:/Users/ksankar/global-bd-conf/cars_1.parquet already exists.;'

In [50]:
# No error even if the file exists
df_cars_1.repartition(1).write.mode("overwrite").format("parquet").save("cars_1.parquet")
# Use repartition if you want all data in one (or more) file

In [51]:
# Appends to existing file
df_cars_1.repartition(1).write.mode("append").format("parquet").save("cars_1_a.parquet")
# Even with repartition, you will see more files as it is append

In [52]:
df_append = sqlContext.load("cars_1_a.parquet")
# sqlContext.load is deprecated


/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/context.py:538: UserWarning: load is deprecated. Use read.load() instead.
  warnings.warn("load is deprecated. Use read.load() instead.")

In [53]:
df_append.count()


Out[53]:
320

In [54]:
#eventhough parquet is the default format, explicit format("parquet") is clearer
df_append = sqlContext.read.format("parquet").load("cars_1_a.parquet")
df_append.count()


Out[54]:
320

In [55]:
# for reading parquet files read.parquet is more elegant
df_append = sqlContext.read.parquet("cars_1_a.parquet")
df_append.count()


Out[55]:
320

In [56]:
# Let us read another file
df_orders = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('NW/NW-Orders.csv')

In [57]:
df_orders.head()


Out[57]:
Row(OrderID=u'10248', CustomerID=u'VINET', EmpliyeeID=u'5', OrderDate=u'7/2/96', ShipCuntry=u'France')

In [58]:
df_orders.dtypes


Out[58]:
[('OrderID', 'string'),
 ('CustomerID', 'string'),
 ('EmpliyeeID', 'string'),
 ('OrderDate', 'string'),
 ('ShipCuntry', 'string')]

In [59]:
from pyspark.sql.types import StringType, IntegerType,DateType
getYear = F.udf(lambda s: s[-2:], StringType()) #IntegerType())
from datetime import datetime
convertToDate = F.udf(lambda s: datetime.strptime(s, '%m/%d/%y'),DateType())

In [60]:
# You could register the function for sql as follows. We won't use this here
sqlContext.registerFunction("getYear", lambda s: s[-2:])

In [61]:
# let us add an year column
df_orders.select(df_orders['OrderID'], 
                 df_orders['CustomerID'],
                 df_orders['EmpliyeeID'], 
                 df_orders['OrderDate'],
                 df_orders['ShipCuntry'].alias('ShipCountry'),
                 getYear(df_orders['OrderDate'])).show()


+-------+----------+----------+---------+-----------+-----------------------------+
|OrderID|CustomerID|EmpliyeeID|OrderDate|ShipCountry|PythonUDF#<lambda>(OrderDate)|
+-------+----------+----------+---------+-----------+-----------------------------+
|  10248|     VINET|         5|   7/2/96|     France|                           96|
|  10249|     TOMSP|         6|   7/3/96|    Germany|                           96|
|  10250|     HANAR|         4|   7/6/96|     Brazil|                           96|
|  10251|     VICTE|         3|   7/6/96|     France|                           96|
|  10252|     SUPRD|         4|   7/7/96|    Belgium|                           96|
|  10253|     HANAR|         3|   7/8/96|     Brazil|                           96|
|  10254|     CHOPS|         5|   7/9/96|Switzerland|                           96|
|  10255|     RICSU|         9|  7/10/96|Switzerland|                           96|
|  10256|     WELLI|         3|  7/13/96|     Brazil|                           96|
|  10257|     HILAA|         4|  7/14/96|  Venezuela|                           96|
|  10258|     ERNSH|         1|  7/15/96|    Austria|                           96|
|  10259|     CENTC|         4|  7/16/96|     Mexico|                           96|
|  10260|     OTTIK|         4|  7/17/96|    Germany|                           96|
|  10261|     QUEDE|         4|  7/17/96|     Brazil|                           96|
|  10262|     RATTC|         8|  7/20/96|        USA|                           96|
|  10263|     ERNSH|         9|  7/21/96|    Austria|                           96|
|  10264|     FOLKO|         6|  7/22/96|     Sweden|                           96|
|  10265|     BLONP|         2|  7/23/96|     France|                           96|
|  10266|     WARTH|         3|  7/24/96|    Finland|                           96|
|  10267|     FRANK|         4|  7/27/96|    Germany|                           96|
+-------+----------+----------+---------+-----------+-----------------------------+
only showing top 20 rows


In [62]:
# let us add an year column
# Need alias
df_orders_1 = df_orders.select(df_orders['OrderID'], 
                 df_orders['CustomerID'],
                 df_orders['EmpliyeeID'], 
                 convertToDate(df_orders['OrderDate']).alias('OrderDate'),
                 df_orders['ShipCuntry'].alias('ShipCountry'),
                 getYear(df_orders['OrderDate']).alias('Year'))
# df_orders_1 = df_orders_x.withColumn('Year',getYear(df_orders_x['OrderDate'])) # doesn't work. Gives error

In [63]:
df_orders_1.show(1)


+-------+----------+----------+----------+-----------+----+
|OrderID|CustomerID|EmpliyeeID| OrderDate|ShipCountry|Year|
+-------+----------+----------+----------+-----------+----+
|  10248|     VINET|         5|1996-07-02|     France|  96|
+-------+----------+----------+----------+-----------+----+
only showing top 1 row


In [64]:
df_orders_1.dtypes


Out[64]:
[('OrderID', 'string'),
 ('CustomerID', 'string'),
 ('EmpliyeeID', 'string'),
 ('OrderDate', 'date'),
 ('ShipCountry', 'string'),
 ('Year', 'string')]

In [65]:
df_orders_1.show()


+-------+----------+----------+----------+-----------+----+
|OrderID|CustomerID|EmpliyeeID| OrderDate|ShipCountry|Year|
+-------+----------+----------+----------+-----------+----+
|  10248|     VINET|         5|1996-07-02|     France|  96|
|  10249|     TOMSP|         6|1996-07-03|    Germany|  96|
|  10250|     HANAR|         4|1996-07-06|     Brazil|  96|
|  10251|     VICTE|         3|1996-07-06|     France|  96|
|  10252|     SUPRD|         4|1996-07-07|    Belgium|  96|
|  10253|     HANAR|         3|1996-07-08|     Brazil|  96|
|  10254|     CHOPS|         5|1996-07-09|Switzerland|  96|
|  10255|     RICSU|         9|1996-07-10|Switzerland|  96|
|  10256|     WELLI|         3|1996-07-13|     Brazil|  96|
|  10257|     HILAA|         4|1996-07-14|  Venezuela|  96|
|  10258|     ERNSH|         1|1996-07-15|    Austria|  96|
|  10259|     CENTC|         4|1996-07-16|     Mexico|  96|
|  10260|     OTTIK|         4|1996-07-17|    Germany|  96|
|  10261|     QUEDE|         4|1996-07-17|     Brazil|  96|
|  10262|     RATTC|         8|1996-07-20|        USA|  96|
|  10263|     ERNSH|         9|1996-07-21|    Austria|  96|
|  10264|     FOLKO|         6|1996-07-22|     Sweden|  96|
|  10265|     BLONP|         2|1996-07-23|     France|  96|
|  10266|     WARTH|         3|1996-07-24|    Finland|  96|
|  10267|     FRANK|         4|1996-07-27|    Germany|  96|
+-------+----------+----------+----------+-----------+----+
only showing top 20 rows


In [66]:
df_orders_1.where(df_orders_1['ShipCountry'] == 'France').show()


+-------+----------+----------+----------+-----------+----+
|OrderID|CustomerID|EmpliyeeID| OrderDate|ShipCountry|Year|
+-------+----------+----------+----------+-----------+----+
|  10248|     VINET|         5|1996-07-02|     France|  96|
|  10251|     VICTE|         3|1996-07-06|     France|  96|
|  10265|     BLONP|         2|1996-07-23|     France|  96|
|  10274|     VINET|         6|1996-08-04|     France|  96|
|  10295|     VINET|         2|1996-08-31|     France|  96|
|  10297|     BLONP|         5|1996-09-02|     France|  96|
|  10311|     DUMON|         1|1996-09-18|     France|  96|
|  10331|     BONAP|         9|1996-10-14|     France|  96|
|  10334|     VICTE|         8|1996-10-19|     France|  96|
|  10340|     BONAP|         1|1996-10-27|     France|  96|
|  10350|     LAMAI|         6|1996-11-09|     France|  96|
|  10358|     LAMAI|         5|1996-11-18|     France|  96|
|  10360|     BLONP|         4|1996-11-20|     France|  96|
|  10362|     BONAP|         3|1996-11-23|     France|  96|
|  10371|     LAMAI|         1|1996-12-01|     France|  96|
|  10408|     FOLIG|         8|1997-01-06|     France|  97|
|  10413|     LAMAI|         3|1997-01-12|     France|  97|
|  10425|     LAMAI|         6|1997-01-22|     France|  97|
|  10436|     BLONP|         3|1997-02-03|     France|  97|
|  10449|     BLONP|         3|1997-02-16|     France|  97|
+-------+----------+----------+----------+-----------+----+
only showing top 20 rows


In [67]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()


+----------+----+-----+
|CustomerID|Year|count|
+----------+----+-----+
|     SAVEA|  97|   17|
|     ERNSH|  97|   14|
|     QUICK|  97|   14|
|     SAVEA|  98|   11|
|     BERGS|  97|   10|
|     WARTH|  97|   10|
|     MEREP|  97|   10|
|     HILAA|  97|   10|
|     HUNGO|  97|   10|
|     ERNSH|  98|    9|
|     FOLKO|  98|    9|
|     KOENE|  97|    8|
|     BOTTM|  98|    8|
|     WHITC|  97|    8|
|     LEHMS|  97|    8|
|     BONAP|  97|    8|
|     FRANK|  97|    8|
|     QUICK|  98|    8|
|     LAMAI|  97|    8|
|     RATTC|  96|    7|
+----------+----+-----+
only showing top 20 rows


In [68]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()


+----------+----+-----+
|CustomerID|Year|count|
+----------+----+-----+
|     SAVEA|  97|   17|
|     ERNSH|  97|   14|
|     QUICK|  97|   14|
|     SAVEA|  98|   11|
|     HUNGO|  97|   10|
|     MEREP|  97|   10|
|     WARTH|  97|   10|
|     BERGS|  97|   10|
|     HILAA|  97|   10|
|     FOLKO|  98|    9|
|     ERNSH|  98|    9|
|     LEHMS|  97|    8|
|     WHITC|  97|    8|
|     QUICK|  98|    8|
|     BOTTM|  98|    8|
|     BONAP|  97|    8|
|     FRANK|  97|    8|
|     LAMAI|  97|    8|
|     KOENE|  97|    8|
|     FOLKO|  97|    7|
+----------+----+-----+
only showing top 20 rows


In [69]:
# save by partition (year)
df_orders_1.write.mode("overwrite").partitionBy("Year").format("parquet").save("orders_1.parquet")
# load defaults to parquet

In [70]:
df_orders_2 = sqlContext.read.parquet("orders_1.parquet")
df_orders_2.explain(True)
df_orders_3 = df_orders_2.filter(df_orders_2.Year=='96')
df_orders_3.explain(True)


== Parsed Logical Plan ==
Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Analyzed Logical Plan ==
OrderID: string, CustomerID: string, EmpliyeeID: string, OrderDate: date, ShipCountry: string, Year: int
Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Optimized Logical Plan ==
Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Physical Plan ==
Scan ParquetRelation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] InputPaths: file:/Users/ksankar/global-bd-conf/orders_1.parquet
== Parsed Logical Plan ==
'Filter (Year#2080 = 96)
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Analyzed Logical Plan ==
OrderID: string, CustomerID: string, EmpliyeeID: string, OrderDate: date, ShipCountry: string, Year: int
Filter (cast(Year#2080 as double) = cast(96 as double))
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Optimized Logical Plan ==
Filter (cast(Year#2080 as double) = 96.0)
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Physical Plan ==
Scan ParquetRelation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] InputPaths: file:/Users/ksankar/global-bd-conf/orders_1.parquet

In [71]:
df_orders_3.count()


Out[71]:
155

In [72]:
df_orders_3.explain(True)


== Parsed Logical Plan ==
'Filter (Year#2080 = 96)
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Analyzed Logical Plan ==
OrderID: string, CustomerID: string, EmpliyeeID: string, OrderDate: date, ShipCountry: string, Year: int
Filter (cast(Year#2080 as double) = cast(96 as double))
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Optimized Logical Plan ==
Filter (cast(Year#2080 as double) = 96.0)
+- Relation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] ParquetRelation

== Physical Plan ==
Scan ParquetRelation[OrderID#2075,CustomerID#2076,EmpliyeeID#2077,OrderDate#2078,ShipCountry#2079,Year#2080] InputPaths: file:/Users/ksankar/global-bd-conf/orders_1.parquet

In [73]:
df_orders_2.count()


Out[73]:
830

In [74]:
df_orders_1.printSchema()


root
 |-- OrderID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- EmpliyeeID: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipCountry: string (nullable = true)
 |-- Year: string (nullable = true)

7. DataFrame : Scientific Functions


In [75]:
# import pyspark.sql.Row
df = sc.parallelize([10,100,1000]).map(lambda x: {"num":x}).toDF()


/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/sql/context.py:262: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

In [76]:
df.show()


+----+
| num|
+----+
|  10|
| 100|
|1000|
+----+


In [77]:
import pyspark.sql.functions as F
df.select(F.log(df.num)).show()


+-----------------+
|         LOG(num)|
+-----------------+
|2.302585092994046|
|4.605170185988092|
|6.907755278982137|
+-----------------+


In [78]:
df.select(F.log10(df.num)).show()


+----------+
|LOG10(num)|
+----------+
|       1.0|
|       2.0|
|       3.0|
+----------+


In [79]:
df = sc.parallelize([0,10,100,1000]).map(lambda x: {"num":x}).toDF()

In [80]:
df.show()


+----+
| num|
+----+
|   0|
|  10|
| 100|
|1000|
+----+


In [81]:
df.select(F.log(df.num)).show()


+-----------------+
|         LOG(num)|
+-----------------+
|             null|
|2.302585092994046|
|4.605170185988092|
|6.907755278982137|
+-----------------+


In [82]:
df.select(F.log1p(df.num)).show()


+------------------+
|        LOG1P(num)|
+------------------+
|               0.0|
|2.3978952727983707|
|  4.61512051684126|
|  6.90875477931522|
+------------------+


In [83]:
df_cars_1.select(df_cars_1['CarbBarrells'], F.sqrt(df_cars_1['mpg'])).show()


+------------+------------------+
|CarbBarrells|         SQRT(mpg)|
+------------+------------------+
|           4| 4.347413023856832|
|           4| 4.123105625617661|
|           1|  4.47213595499958|
|           2| 4.272001872658765|
|           1| 4.479955356920423|
|           4|3.3466401061363023|
|           2| 4.703190406521939|
|           2| 4.633573135281238|
|           2| 5.890670590009257|
|           2| 5.513619500836088|
|           4|  4.06201920231798|
|           2| 6.041522986797286|
|           2| 4.636809247747852|
|           1|  4.43846820423443|
|           2| 4.505552130427524|
|           2| 4.219004621945797|
|           4|3.7934153476781316|
|           4|3.8587562763149474|
|           4| 4.219004621945797|
|           2| 4.050925820105819|
+------------+------------------+
only showing top 20 rows


In [84]:
df = sc.parallelize([(3,4),(5,12),(7,24),(9,40),(11,60),(13,84)]).map(lambda x: {"a":x[0],"b":x[1]}).toDF()

In [85]:
df.show()


+---+---+
|  a|  b|
+---+---+
|  3|  4|
|  5| 12|
|  7| 24|
|  9| 40|
| 11| 60|
| 13| 84|
+---+---+


In [86]:
df.select(df['a'],df['b'],F.hypot(df['a'],df['b']).alias('hypot')).show()


+---+---+-----+
|  a|  b|hypot|
+---+---+-----+
|  3|  4|  5.0|
|  5| 12| 13.0|
|  7| 24| 25.0|
|  9| 40| 41.0|
| 11| 60| 61.0|
| 13| 84| 85.0|
+---+---+-----+

11. DataFrame : Joins, Set Operations


In [87]:
df_a = sc.parallelize( [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}] ).toDF()
df_b = sc.parallelize( [{"X1":"A","X3":True},{"X1":"B","X3":False},{"X1":"D","X3":True}] ).toDF()

In [88]:
df_a.show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
|  C|  3|
+---+---+


In [89]:
df_b.show()


+---+-----+
| X1|   X3|
+---+-----+
|  A| true|
|  B|false|
|  D| true|
+---+-----+


In [90]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'inner')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()


+----+----+---+-----+
|X1-a|X1-b| X2|   X3|
+----+----+---+-----+
|   A|   A|  1| true|
|   B|   B|  2|false|
+----+----+---+-----+


In [91]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'full' or 'fullouter'
# Spark doesn't merge the key columns and so need to alias the column names to distinguih between the columns


+----+----+----+-----+
|X1-a|X1-b|  X2|   X3|
+----+----+----+-----+
|   A|   A|   1| true|
|   B|   B|   2|false|
|   C|null|   3| null|
|null|   D|null| true|
+----+----+----+-----+


In [92]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'left_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'left'


+----+----+---+-----+
|X1-a|X1-b| X2|   X3|
+----+----+---+-----+
|   A|   A|  1| true|
|   B|   B|  2|false|
|   C|null|  3| null|
+----+----+---+-----+


In [93]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'right'


+----+----+----+-----+
|X1-a|X1-b|  X2|   X3|
+----+----+----+-----+
|   A|   A|   1| true|
|   B|   B|   2|false|
|null|   D|null| true|
+----+----+----+-----+


In [94]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()


+----+----+----+-----+
|X1-a|X1-b|  X2|   X3|
+----+----+----+-----+
|   A|   A|   1| true|
|   B|   B|   2|false|
|null|   D|null| true|
+----+----+----+-----+


In [95]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'full')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()# same as 'fullouter'


+----+----+----+-----+
|X1-a|X1-b|  X2|   X3|
+----+----+----+-----+
|   A|   A|   1| true|
|   B|   B|   2|false|
|   C|null|   3| null|
|null|   D|null| true|
+----+----+----+-----+


In [96]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi').show() # same as semijoin
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
+---+---+


In [97]:
#anti-join = df.subtract('leftsemi')
df_a.subtract(df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi')).show() 
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()


+---+---+
| X1| X2|
+---+---+
|  C|  3|
+---+---+


In [98]:
c = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
d = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"D","X2":4}]
df_c = sc.parallelize(c).toDF()
df_d = sc.parallelize(d).toDF()

In [99]:
df_c.show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
|  C|  3|
+---+---+


In [100]:
df_d.show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
|  D|  4|
+---+---+


In [101]:
df_c.intersect(df_d).show()


+---+---+
| X1| X2|
+---+---+
|  B|  2|
|  A|  1|
+---+---+


In [102]:
df_c.subtract(df_d).show()


+---+---+
| X1| X2|
+---+---+
|  C|  3|
+---+---+


In [103]:
df_d.subtract(df_c).show()


+---+---+
| X1| X2|
+---+---+
|  D|  4|
+---+---+


In [104]:
e = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
f = [{"X1":"D","X2":4},{"X1":"E","X2":5},{"X1":"F","X2":6}]
df_e = sc.parallelize(e).toDF()
df_f = sc.parallelize(f).toDF()

In [105]:
df_e.unionAll(df_f).show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
|  C|  3|
|  D|  4|
|  E|  5|
|  F|  6|
+---+---+


In [106]:
# df_a.join(df_b, df_a['X1'] == df_b['X1'], 'semijoin')\
# .select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
# Gives error Unsupported join type 'semijoin'.
# Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 
# 'right', 'leftsemi'

12. DataFrame : Tables & SQL


In [107]:
# SQL on tables

In [108]:
df_a.registerTempTable("tableA")
sqlContext.sql("select * from tableA").show()


+---+---+
| X1| X2|
+---+---+
|  A|  1|
|  B|  2|
|  C|  3|
+---+---+


In [109]:
df_b.registerTempTable("tableB")
sqlContext.sql("select * from tableB").show()


+---+-----+
| X1|   X3|
+---+-----+
|  A| true|
|  B|false|
|  D| true|
+---+-----+


In [110]:
sqlContext.sql("select * from tableA JOIN tableB on tableA.X1 = tableB.X1").show()


+---+---+---+-----+
| X1| X2| X1|   X3|
+---+---+---+-----+
|  A|  1|  A| true|
|  B|  2|  B|false|
+---+---+---+-----+


In [111]:
sqlContext.sql("select * from tableA LEFT JOIN tableB on tableA.X1 = tableB.X1").show()


+---+---+----+-----+
| X1| X2|  X1|   X3|
+---+---+----+-----+
|  A|  1|   A| true|
|  B|  2|   B|false|
|  C|  3|null| null|
+---+---+----+-----+


In [112]:
sqlContext.sql("select * from tableA FULL JOIN tableB on tableA.X1 = tableB.X1").show()


+----+----+----+-----+
|  X1|  X2|  X1|   X3|
+----+----+----+-----+
|   A|   1|   A| true|
|   B|   2|   B|false|
|   C|   3|null| null|
|null|null|   D| true|
+----+----+----+-----+

That's All, Folks !


In [ ]: