I pulled the Dataset from Reddit's Archive Site, which contains a "Complete Public Reddit Comments Corpus".
I'll attempt to bring the dataset into my environment, perform an ETL on the dataset, and run LDA on it to determine the topics of them.
In [3]:
spark.read.parquet("/mnt/mwc/reddit_year_p/year=2012/").count()
In [4]:
# List partitions and register as temp tables
import pyspark.sql.functions as F
from pyspark.sql.types import *
# To create the complete dataset, let's create temporary tables per year then find create a master union table
df = sc.parallelize(dbutils.fs.ls("/mnt/mwc/reddit_year_p")).toDF()
# Parse the year partition to get an array of years to register the tables by
years = df.select(F.regexp_extract('name', '(\d+)', 1).alias('year')).collect()
year_partitions = [x.asDict().values()[0] for x in years if x.asDict().values()[0]]
year_partitions
# Loop over and register a table per year
for y in year_partitions:
df = sqlContext.read.parquet("/mnt/mwc/reddit_year_p/year=%s" % y)
df.createOrReplaceTempView("reddit_%s" % y)
# Register the root directory for the complete dataset
df_complete = spark.read.parquet("/mnt/mwc/reddit_year_p/")
# df_complete.createGlobalTempView("reddit_all")
df_complete.createOrReplaceTempView("reddit_all")
In [5]:
df_complete.rdd.getNumPartitions()
In [6]:
spark.sql("select * from reddit_all where year = 2014").explain(True)
In [7]:
%sql
select * from reddit_all where year = $arg
In [8]:
df_complete.printSchema()
In [9]:
display(df_complete)
A few ideas of what can be accomplished with this dataset
In [11]:
%sh
ls -lh /dbfs/mnt/mwc/reddit_year_p/year=2014/
In [12]:
%sql
-- run 1
select count(1) as count, year from reddit_all group by year order by year asc ;
In [13]:
%sql
-- run 2
select count(1) as count, year from reddit_all group by year order by year asc ;
In [15]:
%sql
--- Find the number of comments per day of week for 2012
SELECT day, sum(comments) as counts from (
SELECT date_format(from_unixtime(created_utc), 'EEEE') day, COUNT(*) comments
FROM reddit_2014
GROUP BY created_utc
ORDER BY created_utc
) q2
GROUP BY day
ORDER BY counts;
In [16]:
%sql
--- Find the number of comments per day of week for 2012
SELECT day, sum(comments) as counts from (
SELECT date_format(from_unixtime(created_utc), 'EEEE') day, COUNT(*) comments
FROM reddit_2014
GROUP BY created_utc
ORDER BY created_utc
) q2
GROUP BY day
ORDER BY counts;
To view the SQL language manaual in Databricks Docs
In [18]:
%sql
-- Select best time to comment on posts
CREATE TABLE IF NOT EXISTS popular_posts_2014
USING parquet
OPTIONS (
path "/mnt/mwc/popular_posts_2014"
)
AS SELECT
day,
hour,
SUM(IF(score >= 1000, 1, 0)) as score_gt_1k
FROM
(SELECT
date_format(from_utc_timestamp(from_unixtime(created_utc), "PST"), 'EEEE') as day,
date_format(from_utc_timestamp(from_unixtime(created_utc), "PST"), 'h a') as hour,
score
FROM reddit_2014) q1
GROUP BY day, hour
ORDER BY day, hour
In [19]:
current_table = 'popular_posts_2014'
df = spark.read.parquet("/mnt/mwc/popular_posts_2014")
df.createOrReplaceTempView('popular_posts_2014')
display(table(current_table))
In [21]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
In [22]:
# Define the labels sorted in my predefined order
column_labels = ["12 AM", "1 AM", "2 AM", "3 AM", "4 AM", "5 AM", "6 AM", "7 AM", "8 AM", "9 AM", "10 AM", "11 AM", "12 PM", "1 PM", "2 PM", "3 PM", "4 PM", "5 PM", "6 PM", "7 PM", "8 PM", "9 PM", "10 PM", "11 PM"]
# Zip up the 2 column names by predefined order
column2_name = ['Count of Comments > 1K Votes']*len(column_labels)
column_label_sorted = zip(column2_name, column_labels)
# Define the row labels to map the calendar week
row_labels = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
data = [[x.day, x.hour, x.score_gt_1k] for x in table(current_table).collect()]
In [23]:
# Create the Pivot Table
colNames = ['Day of Week', 'Hour', 'Count of Comments > 1K Votes']
data_m = pd.DataFrame(data,columns = colNames)
pvt = pd.pivot_table(data_m, index=['Day of Week'], columns=['Hour'])
In [24]:
# Call reindex_axis to sort the row axis by my order array
# Call reindex_axis on axis=1 (columns) to sort columns by my ordered zipped array
pvt_sorted = pvt.reindex_axis(row_labels, axis=0).reindex_axis(column_label_sorted, axis=1)
pvt_sorted
In [25]:
data_p = pvt_sorted.as_matrix().transpose()
fig, ax = plt.subplots()
heatmap = ax.pcolor(data_p, cmap=plt.cm.Blues)
# put the major ticks at the middle of each cell
ax.set_xticks(np.arange(data_p.shape[1])+0.5, minor=False)
ax.set_yticks(np.arange(data_p.shape[0])+0.5, minor=False)
# want a more natural, table-like display
ax.invert_yaxis()
ax.xaxis.tick_top()
ax.set_xticklabels(row_labels, minor=False)
ax.set_yticklabels(column_labels, minor=False)
display()
In [27]:
%r
# Install necessary packages to use ggplot2
install.packages("ggplot2")
install.packages("reshape")
library(plyr)
library(reshape2)
library(scales)
library(ggplot2)
In [28]:
%r
scores <- sql("FROM popular_posts_2014 SELECT *")
local_df <- collect(scores)
# We can pivot the data in 2 ways, option 1 commented out
# local_df$day <- factor(local_df$day)
# xtabs(score_gt_2k ~ hour+day, local_df)
heat_val <- with(local_df, tapply(score_gt_1k, list(hour, day) , I) )
# Define logical times
times <- c("12 AM", "1 AM", "2 AM", "3 AM", "4 AM", "5 AM", "6 AM", "7 AM", "8 AM", "9 AM", "10 AM", "11 AM", "12 PM", "1 PM", "2 PM", "3 PM", "4 PM", "5 PM", "6 PM", "7 PM", "8 PM", "9 PM", "10 PM", "11 PM")
heat_val[times, ]
In [29]:
%r
# Testing out the factor api, which doesn't do much until you use ggplot
local_df.m <- melt(local_df)
local_df.m$hour <- factor(local_df.m$hour, levels=times)
local_df.m
In [30]:
%r
library(scales)
# Melt flattens the R.DataFrame into a friendly format for ggplot
local_df.m <- melt(local_df)
# factor() allows you to specify the exact ordering for the values within a column! This is extremely important since these values have no machine readable sort order.
local_df.m$hour <- factor(local_df.m$hour, levels=rev(times))
local_df.m$day <- factor(local_df.m$day, levels=c("Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"))
# This provides the heatmap of the comment posts
p <- ggplot(local_df.m, aes(day, hour)) + geom_tile(aes(fill = value), colour = "white") + scale_fill_gradient(low = "white", high = "steelblue")
p
In [32]:
df = table("reddit_2010").unionAll(table("reddit_2011")).unionAll(table("reddit_2012"))
df.registerTempTable("reddit_201x")
In [33]:
dfc = sqlContext.sql("""SELECT
score,
AVG(LENGTH(body)) as avg_comment_length,
STDDEV(LENGTH(body))/SQRT(COUNT(score)) as se_comment_length,
COUNT(score) as num_comments
FROM reddit_201x
GROUP BY score
ORDER BY score""")
df = dfc.filter("score >= -200 and score <=2000").select("score", "avg_comment_length", "se_comment_length").toPandas()
In [34]:
from ggplot import *
p = ggplot(df, aes(x='score', y='avg_comment_length')) + \
geom_line(size=0.25, color='red') + \
ylim(0, 1100) + \
xlim(-200, 2000) + \
xlab("(# Upvotes - # Downvotes)") + \
ylab("Avg. Length of Comment For Each Score") + \
ggtitle("Relationship between Reddit Comment Score and Comment Length for Comments")
display(p)
In [36]:
%sql
SELECT subreddit, num_comments
FROM (
SELECT count(*) as num_comments,
subreddit
FROM reddit_2014
GROUP BY subreddit
ORDER BY num_comments DESC
LIMIT 20
) t1
In [37]: