YELP review dataset: http://www.yelp.com/dataset_challenge

Note: we need to load spark-csv for CSV support


In [1]:
from pyspark import SparkContext 
sc = SparkContext('local','example')

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
import pandas as pd

pandas_df = pd.read_csv('data/yelp_data.csv')  

yelp_df = sqlContext.createDataFrame(pandas_df)

In [3]:
yelp_df.printSchema()


root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: boolean (nullable = true)
 |-- review_count: long (nullable = true)
 |-- state: string (nullable = true)


In [4]:
yelp_df.count()


Out[4]:
1000

Referencing a Column

  • As an attribute: df.column_name
  • As a key: df["column_name"]

In [6]:
yelp_df.useful


Out[6]:
Column<b'useful'>

In [7]:
yelp_df["useful"]


Out[7]:
Column<b'useful'>

Filtering

  • df.filter(expression)

In [8]:
yelp_df.filter(yelp_df.stars > 3).count()


Out[8]:
586

Aggregate Functions

  • Cannot apply functions to 'Column' objects
    'Column' object is not callable
    therefor, we need to use select()

  • df.select('column_name').agg({'column':'function'})


In [9]:
yelp_df.select("stars").agg({"stars":"mean"}).collect()


Out[9]:
[Row(avg(stars)=3.56)]

In [10]:
yelp_df.select("stars").agg({"stars":"max"}).collect()


Out[10]:
[Row(max(stars)=5)]

In [11]:
yelp_df.select("stars").agg({"stars":"min"}).collect()


Out[11]:
[Row(min(stars)=1)]

In [12]:
yelp_df.select("stars").agg({"stars":"min"}).show()


+----------+
|min(stars)|
+----------+
|         1|
+----------+


In [13]:
yelp_df.select("id", "useful", "stars").take(5)


Out[13]:
[Row(id='fWKvX83p0-ka4JS3dc6E5A', useful=5, stars=4),
 Row(id='IjZ33sJrzXqU-0X6U8NwyA', useful=0, stars=4),
 Row(id='IESLBzqUCLdSzSqm0eCSxQ', useful=1, stars=4),
 Row(id='G-WvGaISbqqaMHlNnByodA', useful=2, stars=4),
 Row(id='1uJFq2r5QfJG_6ExMRCaGw', useful=0, stars=4)]

Modify a column

  • Within select() give the new expression
    df.select('column_name', df.column_name * 4-30)
  • To change the return type of the statement, use .cast(NEW_TYPE) right after the expression
  • To rename of column, use .alias(NEW_NAME)

In [14]:
yelp_df.select('stars', yelp_df.stars*2.3).show(5)


+-----+-------------+
|stars|(stars * 2.3)|
+-----+-------------+
|    4|          9.2|
|    4|          9.2|
|    4|          9.2|
|    4|          9.2|
|    4|          9.2|
+-----+-------------+
only showing top 5 rows


In [15]:
yelp_df.select('stars', (yelp_df.stars*2.3).cast("int")).show(5)


+-----+--------------------------+
|stars|cast((stars * 2.3) as int)|
+-----+--------------------------+
|    4|                         9|
|    4|                         9|
|    4|                         9|
|    4|                         9|
|    4|                         9|
+-----+--------------------------+
only showing top 5 rows


In [16]:
yelp_df.select('stars', (yelp_df.stars*2.3).cast("int").alias('new_stars')).show(5)


+-----+---------+
|stars|new_stars|
+-----+---------+
|    4|        9|
|    4|        9|
|    4|        9|
|    4|        9|
|    4|        9|
+-----+---------+
only showing top 5 rows

Ordering by a column

from pyspark.sql.functions import asc, desc

  • df.orderBy(desc("column_name"))

In [17]:
from pyspark.sql.functions import asc, desc

yelp_df.select("id", "stars").orderBy(asc("stars")).show(10)


+--------------------+-----+
|                  id|stars|
+--------------------+-----+
|2k57IbygwTWDvu4R1...|    1|
|4FlODbbswl1eq5-z9...|    1|
|7pDu262vEGm7Avr9Q...|    1|
|i4kZYaGypxV6LPCro...|    1|
|pmOExg5ab56RZjG0K...|    1|
|ex4pODOWrfzx1k89F...|    2|
|J2Ig5cV9fJU-KGewd...|    2|
|LN4l4wklQB0IVBjm-...|    2|
|xCEvHEszA3-CmFqtH...|    2|
|OUhKpg-1LG7bpyxHL...|    2|
+--------------------+-----+
only showing top 10 rows

Grouping


In [18]:
yelp_df.groupBy('state').count().show()


+-----+-----+
|state|count|
+-----+-----+
|   TX|   27|
|   AZ|  694|
|   NY|   31|
|   OR|   29|
|   ID|    3|
|   CA|   24|
|   CO|   52|
|   WA|    5|
|   LA|   65|
|   MN|   28|
|   GA|   42|
+-----+-----+


In [19]:
yelp_df.groupBy('state').avg('stars').show()


+-----+------------------+
|state|        avg(stars)|
+-----+------------------+
|   TX|3.6296296296296298|
|   AZ|3.5605187319884726|
|   NY|3.6451612903225805|
|   OR|3.4482758620689653|
|   ID|3.6666666666666665|
|   CA|3.4166666666666665|
|   CO|3.5576923076923075|
|   WA|               4.0|
|   LA|3.5076923076923077|
|   MN| 3.607142857142857|
|   GA|3.5952380952380953|
+-----+------------------+


In [ ]: