In [1]:
import os

In [2]:
from pyspark import SparkContext

In [3]:
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

In [4]:
sc = SparkContext(os.environ.get("CLUSTER_URL"), 'pyspark-demo')

In [5]:
from pyspark.sql import SQLContext

In [6]:
sqlContext = SQLContext(sc)

In [7]:
!wget http://files.figshare.com/1315364/iris.json


--2015-08-24 19:46:19--  http://files.figshare.com/1315364/iris.json
Resolving files.figshare.com (files.figshare.com)... 54.231.129.44
Connecting to files.figshare.com (files.figshare.com)|54.231.129.44|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15802 (15K) [application/json]
Saving to: 'iris.json'

100%[======================================>] 15,802      --.-K/s   in 0.08s   

2015-08-24 19:46:19 (204 KB/s) - 'iris.json' saved [15802/15802]


In [8]:
!hadoop fs -put iris.json /tmp


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

In [9]:
!hadoop fs -lsr /tmp


lsr: DEPRECATED: Please use 'ls -R' instead.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
-rw-r--r--   3 dsb supergroup      15802 2015-08-24 19:46 /tmp/iris.json

In [10]:
iris = sqlContext.read.load('hdfs://54.159.244.205:8020/tmp/iris.json', 'json')

In [11]:
iris.show()


+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|              [|       null|      null|       null|      null|   null|
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|
|           null|        1.5|       0.2|        5.0|       3.4| setosa|
|           null|        1.4|       0.2|        4.4|       2.9| setosa|
|           null|        1.5|       0.1|        4.9|       3.1| setosa|
|           null|        1.5|       0.2|        5.4|       3.7| setosa|
|           null|        1.6|       0.2|        4.8|       3.4| setosa|
|           null|        1.4|       0.1|        4.8|       3.0| setosa|
|           null|        1.1|       0.1|        4.3|       3.0| setosa|
|           null|        1.2|       0.2|        5.8|       4.0| setosa|
|           null|        1.5|       0.4|        5.7|       4.4| setosa|
|           null|        1.3|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        5.1|       3.5| setosa|
|           null|        1.7|       0.3|        5.7|       3.8| setosa|
+---------------+-----------+----------+-----------+----------+-------+


In [12]:
iris.printSchema()


root
 |-- _corrupt_record: string (nullable = true)
 |-- petalLength: double (nullable = true)
 |-- petalWidth: double (nullable = true)
 |-- sepalLength: double (nullable = true)
 |-- sepalWidth: double (nullable = true)
 |-- species: string (nullable = true)


In [14]:
iris.filter(iris.petalLength < 1.2).show()


+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|           null|        1.1|       0.1|        4.3|       3.0| setosa|
|           null|        1.0|       0.2|        4.6|       3.6| setosa|
+---------------+-----------+----------+-----------+----------+-------+


In [15]:
iris.groupBy("species").count().show()


+----------+-----+
|   species|count|
+----------+-----+
|versicolor|   50|
|    setosa|   50|
| virginica|   50|
|      null|    2|
+----------+-----+


In [ ]: