In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-2.2.0.post0.tar.gz (188.3MB)
Collecting py4j==0.10.4 (from pyspark)
  Downloading py4j-0.10.4-py2.py3-none-any.whl (186kB)
Building wheels for collected packages: pyspark
  Running setup.py bdist_wheel for pyspark: started
  Running setup.py bdist_wheel for pyspark: finished with status 'done'
  Stored in directory: C:\Users\Dell\AppData\Local\pip\Cache\wheels\5f\0b\b3\5cb16b15d28dcc32f8e7ec91a044829642874bb7586f6e6cbe
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.4 pyspark-2.2.0

In [3]:
from pyspark import SparkContext,SparkConf
sc=SparkContext()

In [4]:
import os

In [5]:
os.getcwd()


Out[5]:
'C:\\Users\\Dell'

In [6]:
os.chdir('C:\\Users\\Dell\\Desktop')

In [8]:
os.listdir()


Out[8]:
['desktop.ini',
 'dump 2582017',
 'Fusion Church.html',
 'Fusion Church_files',
 'iris.csv',
 'KOG',
 'NF22997109906610.ETicket.pdf',
 'R Packages',
 'Telegram.lnk',
 'twitter_share.jpg',
 'winutils.exe',
 '~$avel Reimbursements.docx',
 '~$thonajay.docx']

In [10]:
#load data
data=sc.textFile('C:\\Users\\Dell\\Desktop\\iris.csv')

In [11]:
type(data)


Out[11]:
pyspark.rdd.RDD

In [12]:
data.top(1)


Out[12]:
['7.9,3.8,6.4,2,"virginica"']

In [13]:
data.first()


Out[13]:
'"Sepal.Length","Sepal.Width","Petal.Length","Petal.Width","Species"'

In [14]:
from pyspark.sql import SparkSession

In [16]:
spark= SparkSession.builder \
    .master("local") \
    .appName("Data Exploration") \
    .getOrCreate()

In [17]:
#load data as Spark DataFrame
data2=spark.read.format("csv") \
    .option("header","true") \
    .option("mode","DROPMALFORMED") \
    .load('C:\\Users\\Dell\\Desktop\\iris.csv')

In [18]:
type(data2)


Out[18]:
pyspark.sql.dataframe.DataFrame

In [19]:
data2.printSchema()


root
 |-- Sepal.Length: string (nullable = true)
 |-- Sepal.Width: string (nullable = true)
 |-- Petal.Length: string (nullable = true)
 |-- Petal.Width: string (nullable = true)
 |-- Species: string (nullable = true)


In [25]:
data2.columns


Out[25]:
['Sepal.Length', 'Sepal.Width', 'Petal.Length', 'Petal.Width', 'Species']

In [28]:
data2.schema.names


Out[28]:
['Sepal.Length', 'Sepal.Width', 'Petal.Length', 'Petal.Width', 'Species']

In [27]:
newColumns=['Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Species']

In [30]:
from functools import reduce

In [32]:
data2 = reduce(lambda data2, idx: data2.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), data2)
data2.printSchema()
data2.show()


root
 |-- Sepal_Length: string (nullable = true)
 |-- Sepal_Width: string (nullable = true)
 |-- Petal_Length: string (nullable = true)
 |-- Petal_Width: string (nullable = true)
 |-- Species: string (nullable = true)

+------------+-----------+------------+-----------+-------+
|Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|          3|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|           5|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|           5|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|          3|         1.4|        0.1| setosa|
|         4.3|          3|         1.1|        0.1| setosa|
|         5.8|          4|         1.2|        0.2| setosa|
|         5.7|        4.4|         1.5|        0.4| setosa|
|         5.4|        3.9|         1.3|        0.4| setosa|
|         5.1|        3.5|         1.4|        0.3| setosa|
|         5.7|        3.8|         1.7|        0.3| setosa|
|         5.1|        3.8|         1.5|        0.3| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 20 rows


In [33]:
data2.dtypes


Out[33]:
[('Sepal_Length', 'string'),
 ('Sepal_Width', 'string'),
 ('Petal_Length', 'string'),
 ('Petal_Width', 'string'),
 ('Species', 'string')]

In [35]:
data3 = data2.select('Sepal_Length', 'Sepal_Width', 'Species')
data3.cache()
data3.count()


Out[35]:
150

In [36]:
data3.show()


+------------+-----------+-------+
|Sepal_Length|Sepal_Width|Species|
+------------+-----------+-------+
|         5.1|        3.5| setosa|
|         4.9|          3| setosa|
|         4.7|        3.2| setosa|
|         4.6|        3.1| setosa|
|           5|        3.6| setosa|
|         5.4|        3.9| setosa|
|         4.6|        3.4| setosa|
|           5|        3.4| setosa|
|         4.4|        2.9| setosa|
|         4.9|        3.1| setosa|
|         5.4|        3.7| setosa|
|         4.8|        3.4| setosa|
|         4.8|          3| setosa|
|         4.3|          3| setosa|
|         5.8|          4| setosa|
|         5.7|        4.4| setosa|
|         5.4|        3.9| setosa|
|         5.1|        3.5| setosa|
|         5.7|        3.8| setosa|
|         5.1|        3.8| setosa|
+------------+-----------+-------+
only showing top 20 rows


In [37]:
data3.limit(5)


Out[37]:
DataFrame[Sepal_Length: string, Sepal_Width: string, Species: string]

In [50]:
data3.limit(5).show()


+------------+-----------+-------+
|Sepal_Length|Sepal_Width|Species|
+------------+-----------+-------+
|         5.1|        3.5| setosa|
|         4.9|          3| setosa|
|         4.7|        3.2| setosa|
|         4.6|        3.1| setosa|
|           5|        3.6| setosa|
+------------+-----------+-------+


In [45]:
data3.limit(5).limit(2).show()


+------------+-----------+-------+
|Sepal_Length|Sepal_Width|Species|
+------------+-----------+-------+
|         5.1|        3.5| setosa|
|         4.9|          3| setosa|
+------------+-----------+-------+


In [61]:
data4=data2.selectExpr('CAST(Sepal_Length AS INT) AS Sepal_Length')

In [62]:
data4


Out[62]:
DataFrame[Sepal_Length: int]

In [63]:
from pyspark.sql.functions import *

In [65]:
data4.select('Sepal_Length').agg(mean('Sepal_Length')).show()


+-----------------+
|avg(Sepal_Length)|
+-----------------+
|5.386666666666667|
+-----------------+


In [66]:
data5=data2.selectExpr('CAST(Sepal_Length AS INT) AS Sepal_Length','CAST(Petal_Width AS INT) AS Petal_Width','CAST(Sepal_Width AS INT) AS Sepal_Width','CAST(Petal_Length AS INT) AS Petal_Length','Species')

In [67]:
data5


Out[67]:
DataFrame[Sepal_Length: int, Petal_Width: int, Sepal_Width: int, Petal_Length: int, Species: string]

In [68]:
data5.columns


Out[68]:
['Sepal_Length', 'Petal_Width', 'Sepal_Width', 'Petal_Length', 'Species']

In [76]:
data5.select('Sepal_Length','Species').groupBy('Species').agg(mean("Sepal_Length")).show()


+----------+-----------------+
|   Species|avg(Sepal_Length)|
+----------+-----------------+
| virginica|             6.08|
|versicolor|             5.48|
|    setosa|              4.6|
+----------+-----------------+


In [ ]: