In [1]:
import pandas as pd
import numpy as np

print "Spark version:",sc.version
print "Pandas version:",pd.__version__

from pandas import Series, DataFrame

from os import getenv
DATADIR = getenv("DATADIR")
SUBDIR = '/PUBLIC/movielens/ml-1m'
DATADIR += SUBDIR


Spark version: 2.2.0
Pandas version: 0.20.2

In [2]:
#--------------------------------------------------
import plotly as plotly
print "Plotly version", plotly.__version__  # version >1.9.4 required
import plotly.graph_objs as go
from plotly import tools

# plotly.offline.init_notebook_mode() # run at the start of every notebook
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot #, plot  # Difference .plot / .iplot ???
init_notebook_mode() # run at the start of every ipython notebook to use plotly.offline
                     # this injects the plotly.js source files into the notebook
#--------------------------------------------------
# %matplotlib inline
# import matplotlib.pyplot as plt
# import seaborn as sns
#--------------------------------------------------


Plotly version 2.0.12

Spark DataFrames


In [3]:
usersDF = spark.read.csv("%s/users.csv" % DATADIR, sep=',', header=False, inferSchema=True)
usersDF =  usersDF.withColumnRenamed('_c0', 'UserID') \
									.withColumnRenamed('_c1', 'Gender') \
									.withColumnRenamed('_c2', 'Age') \
									.withColumnRenamed('_c3', 'Occupation') \
									.withColumnRenamed('_c4', 'ZipCode')

In [4]:
usersDF.show(5)


+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|  02460|
|     5|     M| 25|        20|  55455|
+------+------+---+----------+-------+
only showing top 5 rows


In [5]:
ratingsDF = spark.read.csv("%s/ratings.csv" % DATADIR, sep=',', header=False, inferSchema=True)
ratingsDF =  ratingsDF.withColumnRenamed('_c0', 'UserID') \
											.withColumnRenamed('_c1', 'MovieID') \
											.withColumnRenamed('_c2', 'Rating') \
											.withColumnRenamed('_c3', 'Timestamp')

In [6]:
# Compute Ratings Histogram:
# ratingsHistogram = ratingsDF.groupBy("Rating").agg({'Rating': 'count'})
ratingsHistogram = ratingsDF.groupBy("Rating").count().withColumnRenamed('count','Cnt')
ratingsHistogram.show()


+------+------+
|Rating|   Cnt|
+------+------+
|     1| 56174|
|     3|261197|
|     5|226310|
|     4|348971|
|     2|107557|
+------+------+


Join DataFrames


In [7]:
ratingsWithUserDataDF = ratingsDF.join(usersDF, on='UserID', how='inner')

In [8]:
ratingsWithUserDataDF.show(5)


+------+-------+------+---------+------+---+----------+-------+
|UserID|MovieID|Rating|Timestamp|Gender|Age|Occupation|ZipCode|
+------+-------+------+---------+------+---+----------+-------+
|     1|   1193|     5|978300760|     F|  1|        10|  48067|
|     1|    661|     3|978302109|     F|  1|        10|  48067|
|     1|    914|     3|978301968|     F|  1|        10|  48067|
|     1|   3408|     4|978300275|     F|  1|        10|  48067|
|     1|   2355|     5|978824291|     F|  1|        10|  48067|
+------+-------+------+---------+------+---+----------+-------+
only showing top 5 rows


In [9]:
# Compute Ratings Histogram by Gender:
ratingsHistogram = (
	ratingsWithUserDataDF
	.groupby(['Rating','Gender']).count().withColumnRenamed('count','Cnt')
	.orderBy(["Rating", "Gender"], ascending=[1, 1])
)

In [10]:
ratingsHistogram.show(100)


+------+------+------+
|Rating|Gender|   Cnt|
+------+------+------+
|     1|     F| 13347|
|     1|     M| 42827|
|     2|     F| 24548|
|     2|     M| 83009|
|     3|     F| 62966|
|     3|     M|198231|
|     4|     F| 87033|
|     4|     M|261938|
|     5|     F| 58546|
|     5|     M|167764|
+------+------+------+


In [11]:
fRatingsNr=ratingsWithUserDataDF.filter("Gender = 'F'").count()
mRatingsNr=ratingsWithUserDataDF.filter(ratingsWithUserDataDF['Gender'] == 'M').count()
print "Nr. of ratings by female users:",fRatingsNr
print "Nr. of ratings by male users:  ",mRatingsNr


Nr. of ratings by female users: 246440
Nr. of ratings by male users:   753769

In [12]:
from pyspark.sql.types import IntegerType, FloatType, DoubleType
from pyspark.sql.functions import udf

normalize_udf = udf(lambda cnt, gender: 1.*cnt/fRatingsNr if gender=='F' else 1.*cnt/mRatingsNr, DoubleType())

ratingsHistogram=(
	ratingsHistogram.withColumn("CntNormalized", normalize_udf(ratingsHistogram.Cnt, ratingsHistogram.Gender))
)

In [13]:
ratingsHistogram.show(20)


+------+------+------+-------------------+
|Rating|Gender|   Cnt|      CntNormalized|
+------+------+------+-------------------+
|     1|     F| 13347|0.05415922739814965|
|     1|     M| 42827|0.05681714159112407|
|     2|     F| 24548|0.09961045284856354|
|     2|     M| 83009|0.11012525057411487|
|     3|     F| 62966|0.25550235351403994|
|     3|     M|198231| 0.2629864056494762|
|     4|     F| 87033| 0.3531610128225937|
|     4|     M|261938| 0.3475043415157694|
|     5|     F| 58546|0.23756695341665315|
|     5|     M|167764|0.22256686066951545|
+------+------+------+-------------------+


In [14]:
ratingsHistogram.groupby('Gender').sum('CntNormalized').show()


+------+------------------+
|Gender|sum(CntNormalized)|
+------+------------------+
|     F|               1.0|
|     M|               1.0|
+------+------------------+


Read movies data


In [15]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [16]:
moviesDF = spark.read.csv("%s/movies.csv" % DATADIR, sep='+', header=False, inferSchema=True)
moviesDF =  moviesDF.withColumnRenamed('_c0', 'MovieID') \
									.withColumnRenamed('_c1', 'Title') \
									.withColumnRenamed('_c2', 'Genres')

In [17]:
moviesDF.show(3, truncate=50)


+-------+-----------------------+----------------------------+
|MovieID|                  Title|                      Genres|
+-------+-----------------------+----------------------------+
|      1|       Toy Story (1995)| Animation|Children's|Comedy|
|      2|         Jumanji (1995)|Adventure|Children's|Fantasy|
|      3|Grumpier Old Men (1995)|              Comedy|Romance|
+-------+-----------------------+----------------------------+
only showing top 3 rows


In [18]:
split_udf = udf(lambda s: s.split("|"), T.ArrayType(T.StringType()))
moviesDF=moviesDF.withColumn("Genres", split_udf(moviesDF['Genres']))

In [19]:
moviesDF.show(3, truncate=50)


+-------+-----------------------+--------------------------------+
|MovieID|                  Title|                          Genres|
+-------+-----------------------+--------------------------------+
|      1|       Toy Story (1995)| [Animation, Children's, Comedy]|
|      2|         Jumanji (1995)|[Adventure, Children's, Fantasy]|
|      3|Grumpier Old Men (1995)|               [Comedy, Romance]|
+-------+-----------------------+--------------------------------+
only showing top 3 rows


In [20]:
ratingsWithUserAndMovieDataDF = ratingsWithUserDataDF.join(moviesDF, how='inner', on='MovieID')

In [21]:
print "Nr. of rows:", ratingsWithUserAndMovieDataDF.count()
ratingsWithUserAndMovieDataDF.sort(['MovieID','UserID']).show(3, truncate=50)


Nr. of rows: 1000209
+-------+------+------+---------+------+---+----------+-------+----------------+-------------------------------+
|MovieID|UserID|Rating|Timestamp|Gender|Age|Occupation|ZipCode|           Title|                         Genres|
+-------+------+------+---------+------+---+----------+-------+----------------+-------------------------------+
|      1|     1|     5|978824268|     F|  1|        10|  48067|Toy Story (1995)|[Animation, Children's, Comedy]|
|      1|     6|     4|978237008|     F| 50|         9|  55117|Toy Story (1995)|[Animation, Children's, Comedy]|
|      1|     8|     4|978233496|     M| 25|        12|  11413|Toy Story (1995)|[Animation, Children's, Comedy]|
+-------+------+------+---------+------+---+----------+-------+----------------+-------------------------------+
only showing top 3 rows


In [22]:
ratingsByGenderAndGenreSF = (
	ratingsWithUserAndMovieDataDF
	.withColumn('Genre', F.explode(ratingsWithUserAndMovieDataDF.Genres))
	.drop('Genres')
	.groupBy(['Gender','Rating','Genre'])
	.agg(
		F.count('*').alias('Cnt'),
		F.mean('Age').alias('AvgAge')
	)
# 	.agg({"*":'count', "Age":'mean'})
	.sort(['Genre','Gender','Rating'])
)

In [23]:
ratingsByGenderAndGenreSF.show(20)


+------+------+---------+-----+------------------+
|Gender|Rating|    Genre|  Cnt|            AvgAge|
+------+------+---------+-----+------------------+
|     F|     1|   Action| 3087|27.126660187884678|
|     F|     2|   Action| 5446|28.806096217407273|
|     F|     3|   Action|12412| 29.63116339026748|
|     F|     4|   Action|15410|30.180856586632057|
|     F|     5|   Action| 9295|29.960731576116192|
|     M|     1|   Action|13444| 27.07371318060101|
|     M|     2|   Action|25986|28.363849765258216|
|     M|     3|   Action|58316|29.262723780780576|
|     M|     4|   Action|71169|29.408590819036377|
|     M|     5|   Action|42892|29.018814697379465|
|     F|     1|Adventure| 1696|26.340801886792452|
|     F|     2|Adventure| 3178|28.382630585273755|
|     F|     3|Adventure| 7592|29.033192834562698|
|     F|     4|Adventure| 9144|29.722331583552055|
|     F|     5|Adventure| 5722|29.570604683677036|
|     M|     1|Adventure| 6798| 26.75684024713151|
|     M|     2|Adventure|13463|28.504790908415657|
|     M|     3|Adventure|30275|29.490305532617672|
|     M|     4|Adventure|35199| 29.62527344526833|
|     M|     5|Adventure|20886|29.638657473905965|
+------+------+---------+-----+------------------+
only showing top 20 rows


In [24]:
avgRatingsByGenderAndGenreDF = (
	ratingsWithUserAndMovieDataDF
	.withColumn('Genre', F.explode(ratingsWithUserAndMovieDataDF.Genres))
	.drop('Genres')
	.groupBy(['Gender','Genre'])
	.agg(
		F.count('*').alias('Cnt'),
		F.mean('Rating').alias('AvgRating')
	)
# 	.agg({"*":'count', "Rating":'mean'})
	.sort(['Genre','Gender'])
)

In [25]:
avgRatingsByGenderAndGenreDF.show(20)


+------+-----------+------+------------------+
|Gender|      Genre|   Cnt|         AvgRating|
+------+-----------+------+------------------+
|     F|     Action| 45650| 3.490251916757941|
|     M|     Action|211807|3.4913860259575933|
|     F|  Adventure| 27332|3.5128786770086347|
|     M|  Adventure|106621| 3.468125416193808|
|     F|  Animation| 12221| 3.744701742901563|
|     M|  Animation| 31072| 3.661334963954686|
|     F| Children's| 21317| 3.572547731857203|
|     M| Children's| 50869|3.3589612534156363|
|     F|     Comedy| 96271|3.5719375512875113|
|     M|     Comedy|260309| 3.503666796000138|
|     F|      Crime| 16442|3.6893321980294367|
|     M|      Crime| 63099|3.7137197102965183|
|     F|Documentary|  1940|  3.94639175257732|
|     M|Documentary|  5970|3.9288107202680065|
|     F|      Drama| 98153|3.7656617729463187|
|     M|      Drama|256376|3.7665889162792148|
|     F|    Fantasy|  8718|3.5130763936682725|
|     M|    Fantasy| 27583|3.4266033426385816|
|     F|  Film-Noir|  4202| 4.018086625416468|
|     M|  Film-Noir| 14059| 4.092254072124618|
+------+-----------+------+------------------+
only showing top 20 rows


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: