Querying 1.6 1.8 billion reddit comments

with python


Daniel Rodriguez / PyData NYC / Nov 11, 2015

github.com/danielfrg/pydata-nyc-2015

About me

Daniel Rodriguez

Data Scientist and Software Developer at Continuum Analytics

Anaconda Cluster

Bogotá, Colombia [1] now in Austin, TX




[1] Yes, it’s spelled like that, with two ‘o’s. Columbia with a ‘u’ is a university in New York City. It’s not that hard: ColOmbia is a cOuntry, ColUmbia is a University.

What is this talk about?

Querying 1.8 billion reddit comments with python

And how you can do it

  • Data science "friendly" clusters
  • ETL plus a little bit on data formats
    • Parquet
  • Querying data with some new python libraries that target remote engines
    • Impala

Assumes some basic knowledge of big data tools like HDFS, Map Reduce, Spark or similar

Data

Data

The frontpage of the Internet

Data

Is available on S3: s3://blaze-data/reddit/json

Clusters

Clusters

Big Data technologies (Hadoop zoo) are here to stay

Really good management tools for IT/DevOps people from Cloudera and Hortonworks

  • Automated deployment and configuration
  • Customizable monitoring and reporting
  • Effortless, robust troubleshooting
  • Zero downtime maintenance: rolling upgrades and rollbacks
  • Security: Kerberos, LDAP

Clusters

Data science "friendly" clusters

Some of the features before plus:

  • Data analysis packages and environment management
  • Interactive access to the cluster (Jupyter Notebook)
  • Short living clusters (?)
  • CLI instead of UI (?)
  • More freedom (?)

Still requires to know what you are doing: AWS, Keypairs, Security groups, SSH.

No need to hide stuff. No magic.

Clusters: Anaconda Cluster

  • Resource management tool that allows users to easily create, provision, and manage bare-metal or cloud-based clusters.
  • It enables management of Conda environments on clusters
  • Provides integration, configuration, and setup management for Hadoop
  • Supported platforms include Amazon Web Services, physical machines, or even a collection of virtual machines.

http://docs.continuum.io/anaconda-cluster

$ conda install anaconda-client
$ anaconda login
$ conda install anaconda-cluster -c anaconda-cluster

Not open source: 4 free nodes

Soon 16 free nodes in the cloud 4 in-house

Clusters: Anaconda Cluster

Provider:

aws:
  cloud_provider: ec2
  keyname: {{ keyname in aws }}
  location: us-east-1
  private_key: ~/.ssh/{{ keyname in aws }}.pem
  secret_id: {{ aws key }}
  secret_key: {{ aws secret }}

Profile:

name: impala-profile
provider: aws
user: ubuntu
num_nodes: 10
node_id: ami-08faa660  # Ubuntu 12.04
node_type: m3.2xlarge
root_size: 1000
plugins:
  - hdfs:
      namenode_dirs:
        - /data/dfs/nn
      datanode_dirs:
        - /data/dfs/dn
  - hive
  - impala
  - notebook

Launch cluster:

$ acluster create impala-cluster -p impala-profile

Clusters: Anaconda Cluster

acluster

$ acluster create name -p profile

$ acluster destroy

$ acluster ssh

$ acluster cmd 'date'

$ acluster cmd 'apt-get install build-essential' --sudo 

$ acluster conda install numpy

$ acluster conda install my_pkg -c channel

$ acluster submit script.py

$ acluster put script.py /tmp/script.py

$ acluster get /tmp/script.py script.py

Clusters: Anaconda Cluster

acluster install

$ acluster install hdfs
$ acluster install hive
$ acluster install impala

$ acluster install elasticsearch
$ acluster install kibana
$ acluster install logstash

$ acluster install notebook
$ acluster install spark-standalone
$ acluster install spark-yarn

$ acluster install storm

$ acluster install ganglia

Clusters: Anaconda Cluster

$ acluster conda install -c r r-essentials
Installing packages on cluster "impala": r-essentials

Node "ip-172-31-0-186.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-190.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-182.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-189.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-191.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-183.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-184.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-187.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-185.ec2.internal":
    Successful actions: 1/1
Node "ip-172-31-0-188.ec2.internal":
    Successful actions: 1/1

Clusters: DataScienceBox

Pre Anaconda Cluster: Command line utility to create instances in the cloud ready for data science. Includes conda package management plus some Big Data frameworks (spark).

https://github.com/danielfrg/datasciencebox

$ pip install datasciencebox

CLI will be available:

$ datasciencebox
$ dsb

$ dsb up

$ dsb install miniconda
$ dsb install conda numpy

$ dsb install notebook
$ dsb install hdfs
$ dsb install spark
$ dsb install impala

Clusters: Under the Hood

Both DSB and AC use a very similar approach: SSH for basic stuff and then use Salt

Salt: https://github.com/saltstack/salt

  • 100% free and open source
  • Fast: ZMQ instead of SSH
  • Secure
  • Scalable to thousands of nodes
  • Declarative yaml languague instead of bash scripts
numpy-install:
  conda.installed:
    - name: numpy
    - require:
        - file: config-file
  • A lot of free formulas online

Data

Data: Moving the data

Move data from S3 to our HDFS cluster

hadoop distcp -Dfs.s3n.awsAccessKeyId={{ }} -Dfs.s3n.awsSecretAccessKey={{ }}
s3n://blaze-data/reddit/json/*/*.json /user/ubuntu

Data: Parquet

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

Data: Parquet




[1] http://docs.aws.amazon.com/redshift/latest/dg/c_columnar_storage_disk_mem_mgmnt.html

Data: Load

hive > CREATE TABLE reddit_json (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int
)
ROW FORMAT
    serde 'com.amazon.elasticmapreduce.JsonSerde'
    with serdeproperties ('paths'='archived,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,downs,edited,gilded,id,link_id,name,parent_id,removal_reason,retrieved_on,score,score_hidden,subreddit,subreddit_id,ups');
hive > LOAD DATA INPATH '/user/ubuntu/*.json' INTO TABLE reddit_json;

Data: Transform

hive > CREATE TABLE reddit_parquet (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int,
  created_utc_t            timestamp
)
STORED AS PARQUET;
hive > SET dfs.block.size=1g;

hive > INSERT OVERWRITE TABLE reddit_parquet select *, cast(cast(created_utc as double) as timestamp) as created_utc_t FROM reddit_json;

Querying

Querying

Using the regular hive/impala shell

impala > invalidate metadata;

impala > SELECT count(*) FROM reddit_parquet;
Query: select count(*) FROM reddit_parquet
+------------+
| count(*)   |
+------------+
| 1830807828 |
+------------+

Fetched 1 row(s) in 4.88s

Querying with python

Numpy and Pandas like API that targets not local files but another engines

SQL:

  • Postgres
  • Impala
  • Hive
  • Spark SQL

NoSQL:

  • Mongo DB
  • Still local files

Blaze

An interface to query data on different storage systems

Code at https://github.com/blaze/blaze

Blaze ecosystem: http://blaze.pydata.org


In [1]:
import blaze as bz
import pandas as pd

In [2]:
data = bz.Data('impala://54.209.0.148/default::reddit_parquet')

In [3]:
data.schema


Out[3]:
dshape("""{
  archived: ?bool,
  author: ?string,
  author_flair_css_class: ?string,
  author_flair_text: ?string,
  body: ?string,
  controversiality: ?int32,
  created_utc: ?string,
  distinguished: ?string,
  downs: ?int32,
  edited: ?bool,
  gilded: ?int32,
  id: ?string,
  link_id: ?string,
  name: ?string,
  parent_id: ?string,
  removal_reason: ?string,
  retrieved_on: ?datetime,
  score: ?int32,
  score_hidden: ?bool,
  subreddit: ?string,
  subreddit_id: ?string,
  ups: ?int32,
  created_utc_t: ?datetime
  }""")

Blaze

Number of comments


In [4]:
data.id.count()


Out[4]:
1830807828

In [5]:
print(bz.compute(data.id.count()))


SELECT count(reddit_parquet.id) AS id_count 
FROM reddit_parquet

Blaze

Total number of up votes


In [6]:
n_up_votes = data.ups.sum()

In [7]:
print(bz.compute(n_up_votes))


SELECT sum(reddit_parquet.ups) AS ups_sum 
FROM reddit_parquet

In [8]:
%time int(n_up_votes)


CPU times: user 22.4 ms, sys: 6.84 ms, total: 29.2 ms
Wall time: 3.69 s
Out[8]:
9696701385

Blaze

Counting the total number of posts in the /r/soccer subreddit


In [9]:
n_posts_in_r_soccer = data[data.subreddit == 'soccer'].id.count()

In [10]:
print(bz.compute(n_posts_in_r_soccer))


SELECT count(alias_1.id) AS id_count 
FROM (SELECT reddit_parquet.id AS id 
FROM reddit_parquet 
WHERE reddit_parquet.subreddit = %(subreddit_1)s) AS alias_1

In [11]:
%time int(n_posts_in_r_soccer)


CPU times: user 28.6 ms, sys: 8.61 ms, total: 37.2 ms
Wall time: 5 s
Out[11]:
13078620

Blaze

Counting the number of comments before a specific hour


In [12]:
before_1pm = data.id[bz.hour(data.created_utc_t) < 13].count()

In [13]:
print(bz.compute(before_1pm))


SELECT count(alias_3.id) AS id_count 
FROM (SELECT reddit_parquet.id AS id 
FROM reddit_parquet 
WHERE EXTRACT(hour FROM reddit_parquet.created_utc_t) < %(param_1)s) AS alias_3

In [14]:
%time int(before_1pm)


CPU times: user 32.7 ms, sys: 9.88 ms, total: 42.6 ms
Wall time: 5.54 s
Out[14]:
812870494

Blaze

Plotting the daily frequency of comments in the /r/IAmA subreddit


In [15]:
iama = data[(data.subreddit == 'IAmA')]

In [16]:
days = (bz.year(iama.created_utc_t) - 2007) * 365 + (bz.month(iama.created_utc_t) - 1) * 31  + bz.day(iama.created_utc_t)

In [17]:
iama_with_day = bz.transform(iama, day=days)

In [18]:
by_day = bz.by(iama_with_day.day, posts=iama_with_day.created_utc_t.count())

Blaze

Plotting the daily frequency of comments in the /r/IAmA subreddit

Pandas


In [19]:
by_day_result = bz.odo(by_day, pd.DataFrame)  # Actually triggers the computation

In [20]:
by_day_result.head()


Out[20]:
day posts
0 2405 16202
1 2978 2361
2 1418 5444
3 1874 8833
4 1257 4480

In [21]:
by_day_result = by_day_result.sort_values(by=['day'])

In [22]:
rng = pd.date_range('5/28/2009', periods=len(by_day_result), freq='D')
by_day_result.index = rng

Blaze

Plotting the daily frequency of comments in the /r/IAmA subreddit


In [23]:
from bokeh._legacy_charts import TimeSeries, output_notebook, show

In [24]:
output_notebook()