Analysis of Reddit Comments

I pulled the Dataset from Reddit's Archive Site, which contains a "Complete Public Reddit Comments Corpus".
Reddit post with more details of the dataset.

  • 20 million API calls to generate over months
  • 1TB compressed JSON
  • 1.7 billion comments

I'll attempt to bring the dataset into my environment, perform an ETL on the dataset, and run LDA on it to determine the topics of them.


In [2]:
# Install an init script to test boto3 import functionality to push data to an S3 bucket
dbutils.fs.put("/databricks/init/mwc/install_boto3.sh", """
#/bin/bash
pip install boto3
""", True)

Parse Metadata

The following cells pull a metadata file to the dataset that we will use to analyze and load into our own S3 bucket for processing.


In [4]:
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import xml.etree.ElementTree as ET
import requests
import shutil

reddit_meta_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/2015_reddit_comments_corpus_files.xml'
r = requests.get(reddit_meta_url, stream=True)
if r.status_code == 200:
    with open('/tmp/reddit_meta.xml', 'wb') as f:
        r.raw.decode_content = True
        shutil.copyfileobj(r.raw, f)

In [5]:
%sh
head -n 10 /tmp/reddit_meta.xml

In [6]:
# Load the Python XML parser to parse the filenames / URL from the metadata
import xml.etree.ElementTree as etree
with open('/tmp/reddit_meta.xml', 'r') as xml_file:
    xml_tree = etree.parse(xml_file)
root = xml_tree.getroot()

# Filter out the entries that contain the name tags that are not empty and get the file size tags
file_names = [elem.attrib['name'] for elem in root.iter() if 'name' in  elem.attrib]
file_sizes = [elem.text for elem in root.iter() if 'size' in elem.tag]

In [7]:
# Find the indices for the files that end with bzip2 compression
indx = []
for f in file_names:
  if "bz2" in f:
    indx.append(file_names.index(f))

# Parse out the names and sizes
fnames = [file_names[i] for i in indx]
fsizes = [file_sizes[i] for i in indx]

# Zip the values up to create an RDD and convert it to a DataFrame
df = sc.parallelize(zip(fnames, fsizes)).toDF(["fname", "fsize"])

In [8]:
from pyspark.sql.functions import col
display(df.select(col("fsize").alias("file size"), col("fname").alias("name")))

In [9]:
import pyspark.sql.functions as F

df_mb = df.withColumn('size_mb', df.fsize / 1000000)
display(df_mb.select(F.split(df_mb.fname, '/')[1].alias('year'), df_mb.size_mb.alias("Size in MB")))

In [10]:
# 26 data files missing since they're > 3GB in size compressed
# There is an 5GB object limit for S3, so I filtered out files over 3GB to test the multi-part upload api for S3
df_mb.filter(df_mb.size_mb >= 3000.0).count()

In [11]:
display(df_mb.filter(df_mb.size_mb < 3000.0))

Parallel Data Ingest

This is the beginning of the APIs used to pull the datasets into my S3 bucket.


In [13]:
%run "/Users/mwc@databricks.com/_helper.py"

In [14]:
(ACCESS_KEY, SECRET_KEY) = get_creds()

Boto File Upload APIs

Boto implementation to support multi-part uploads for S3.


In [16]:
import requests

def download_file(url):
  local_filename = url.split('/')[-1]
  local_fq_filename = "/tmp/" + local_filename
  # NOTE the stream=True parameter
  r = requests.get(url, stream=True)
  with open(local_fq_filename, 'wb') as f:
    for chunk in r.iter_content(chunk_size=2048): 
      if chunk: # filter out keep-alive new chunks
        f.write(chunk)
  return local_fq_filename

def upload_file_multipart(s3, bucketname, filepath, filename):
    import os
    import math

    bkt = s3.get_bucket(bucketname)
    k = bkt.new_key(filename)

    mp = bkt.initiate_multipart_upload(filename)

    source_size = os.stat(filepath).st_size
    bytes_per_chunk = 5000*1024*1024
    chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))

    for i in range(chunks_count):
            offset = i * bytes_per_chunk
            remaining_bytes = source_size - offset
            bytes = min([bytes_per_chunk, remaining_bytes])
            part_num = i + 1

            print "uploading part " + str(part_num) + " of " + str(chunks_count)

            with open(filepath, 'r') as fp:
                    fp.seek(offset)
                    mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)

    if len(mp.get_all_parts()) == chunks_count:
            mp.complete_upload()
            print "upload_file done"
    else:
            mp.cancel_upload()
            print "upload_file failed"

In [17]:
def get_files_to_s3_multipart(files):
  # REF: http://codeinpython.blogspot.com/2015/08/s3-upload-large-files-to-amazon-using.html 
  # Import the necessary modules for the executors
  import boto
  import requests
  import os
  debug = True
  bucket_name = get_bucket_name()
  # Base URL to fetch the files
  base_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/'
  
  # Register the S3 boto3 client for AWS
  s3client = boto.connect_s3(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY)
  
  bkt = s3client.get_bucket(bucket_name)

  for r in files:
    # Step 1: Get the initial file
    # Build the full URL
    file_url = base_url + r.fname
    # Download file to local /tmp
    download_file(file_url)
    
    # Step 2: Initiate multipart upload for large files 
    file_name = r.fname.split('/')[-1]
    file_path = "/tmp/" + file_name
    if debug:
      print "filepath: " + file_path
      print "file_name: reddit/" + file_name
    upload_file_multipart(s3client, bucket_name, file_path, "reddit/" + file_name)
    
    # Delete local file and continue
    os.remove(file_path)

Boto3 File Upload API

Boto3 has a limitation with the 5GB limit for S3 uploads. I've created the multi-part api using the older boto library.


In [19]:
# Print statements are seen in stderr on cluster executors
# This process takes 2079.28s to complete the download and push to S3
def get_files_to_s3_boto3(files):
  # Import the necessary modules for the executors
  import boto3
  import requests
  import os
  debug = True
  bucketname = get_bucket_name()
  # Register the S3 boto3 client for AWS
  s3client = boto3.client('s3',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY)
  
  # Base URL to fetch the files
  base_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/'
  for r in files:
    # Build the full URL
    file_url = base_url + r.fname
    # Download file to local /tmp
    download_file(file_url)
    # Put file to S3 bucket
    file_name = r.fname.split('/')[-1]
    s3_obj_key = "reddit/" + file_name
    s3client.put_object(Bucket=bucketname, Key=s3_obj_key, Body=open('/tmp/' + file_name, 'rb'))

    # Delete local file and continue
    os.remove("/tmp/" + file_name)

Upload files < 5GB in Size


In [21]:
# Filter files < 3GB in total size to build out a DataFrame to perform the parallel ingestion
df_lt_3gb = df_mb.filter(df_mb.size_mb < 3000.0)
df_gt_3gb = df_mb.filter(df_mb.size_mb >= 3000.0)

In [22]:
# 2 Implementations to get around S3 issues. Multi-part upload used for larger objects.
df_lt_3gb.foreachPartition(get_files_to_s3)
df_gt_3gb.foreachPartition(get_files_to_s3_multipart)

Calculate Total Size of Compressed Data


In [24]:
%scala
// Calculate the size of the compressed dataset
import org.apache.spark.sql.functions._

val df = dbutils.fs.ls("/mnt/mwc/reddit").toDF()
val c = df.agg(sum("size")).collect()(0)
val p = dbutils.fs.ls("/mnt/mwc/reddit_parquet").toDF().agg(sum("size")).collect()(0)

Read Reddit Parquet Data For Analysis

We've pulled the dataset into an S3 bucket to allow Spark to process it further.

  • Input: json + bzip2 compression (50,687,364,160 = 50.5GB)
  • Output: Parquet + gzip (52,395,455,189 = 52.3GB)

In [26]:
# Takes 4836.81 seconds to do the ETL of 
dfr = sqlContext.read.json("/mnt/mwc/reddit/")
dfr.write.parquet("/mnt/mwc/reddit_parquet")

In [27]:
dfp = sqlContext.read.parquet("/mnt/mwc/reddit_parquet")
dfp.registerTempTable("mwc_reddit")
dfp.printSchema()

In [28]:
display(dfp)

In [29]:
dfp.withColumn('year', F.year(F.from_unixtime(dfp.created_utc)))

Partitioned Datasets

Loading the complete dataset requires isn't always necessary, so I partitioned the dataset by the year of each post. I partitioned by year and we can easily create logical samples of the datasets using unions or views based on a subset of the years.


In [31]:
# Create the new column called 'year' and we partition the new dataset. 
df_p = dfp.withColumn('year', F.year(F.from_unixtime(dfp.created_utc)))
df_p.write.partitionBy('year').parquet('/mnt/mwc/reddit_year')

Working with Partitioned Datasets

Here I've created temp tables to reference the partitioned datasets. We can read the partitions directly and register them as temp tables without overhead of using the Hive Metastore. Here I find the partitions, and loop through them creating reddit_$year tables.


In [33]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# To create the complete dataset, let's create temporary tables per year then find create a master union table
df = sc.parallelize(dbutils.fs.ls("/mnt/mwc/reddit_year")).toDF()

# Parse the year partition to get an array of years to register the tables by
years = df.select(F.regexp_extract('name', '(\d+)', 1).alias('year')).collect()
year_partitions = [x.asDict().values()[0] for x in years if x.asDict().values()[0]]
year_partitions

In [34]:
# Loop over and register a table per year 
for y in year_partitions:
  df = sqlContext.read.parquet("/mnt/mwc/reddit_year/year=%s" % y)
  df.registerTempTable("reddit_%s" % y)

# Register the root directory for the complete dataset 
df_complete = sqlContext.read.parquet("/mnt/mwc/reddit_year/").registerTempTable("reddit_all")

In [35]: