Daniel Rodriguez
Data Scientist and Software Developer at Continuum Analytics
Anaconda Cluster
Bogotá, Colombia [1] now in Austin, TX
[1] Yes, it’s spelled like that, with two ‘o’s. Columbia with a ‘u’ is a university in New York City. It’s not that hard: ColOmbia is a cOuntry, ColUmbia is a University.
Querying 1.8 billion reddit comments with python
And how you can do it
Assumes some basic knowledge of big data tools like HDFS, Map Reduce, Spark or similar
More info at: http://blaze.pydata.org/blog/2015/09/16/reddit-impala/
New monthly dumps at: http://pan.whatbox.ca:36975/reddit/comments/monthly/
Really good management tools for IT/DevOps people from Cloudera and Hortonworks
Some of the features before plus:
Still requires to know what you are doing: AWS, Keypairs, Security groups, SSH.
No need to hide stuff. No magic.
$ conda install anaconda-client
$ anaconda login
$ conda install anaconda-cluster -c anaconda-cluster
Not open source: 4 free nodes
Soon 16 free nodes in the cloud 4 in-house
Profile:
name: impala-profile
provider: aws
user: ubuntu
num_nodes: 10
node_id: ami-08faa660 # Ubuntu 12.04
node_type: m3.2xlarge
root_size: 1000
plugins:
- hdfs:
namenode_dirs:
- /data/dfs/nn
datanode_dirs:
- /data/dfs/dn
- hive
- impala
- notebook
Launch cluster:
$ acluster create impala-cluster -p impala-profile
$ acluster create name -p profile
$ acluster destroy
$ acluster ssh
$ acluster cmd 'date'
$ acluster cmd 'apt-get install build-essential' --sudo
$ acluster conda install numpy
$ acluster conda install my_pkg -c channel
$ acluster submit script.py
$ acluster put script.py /tmp/script.py
$ acluster get /tmp/script.py script.py
acluster install
$ acluster install hdfs
$ acluster install hive
$ acluster install impala
$ acluster install elasticsearch
$ acluster install kibana
$ acluster install logstash
$ acluster install notebook
$ acluster install spark-standalone
$ acluster install spark-yarn
$ acluster install storm
$ acluster install ganglia
$ acluster conda install -c r r-essentials
Installing packages on cluster "impala": r-essentials
Node "ip-172-31-0-186.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-190.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-182.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-189.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-191.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-183.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-184.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-187.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-185.ec2.internal":
Successful actions: 1/1
Node "ip-172-31-0-188.ec2.internal":
Successful actions: 1/1
Pre Anaconda Cluster: Command line utility to create instances in the cloud ready for data science. Includes conda package management plus some Big Data frameworks (spark).
https://github.com/danielfrg/datasciencebox
$ pip install datasciencebox
CLI will be available:
$ datasciencebox
$ dsb
$ dsb up
$ dsb install miniconda
$ dsb install conda numpy
$ dsb install notebook
$ dsb install hdfs
$ dsb install spark
$ dsb install impala
Salt: https://github.com/saltstack/salt
numpy-install:
conda.installed:
- name: numpy
- require:
- file: config-file
hadoop distcp -Dfs.s3n.awsAccessKeyId={{ }} -Dfs.s3n.awsSecretAccessKey={{ }}
s3n://blaze-data/reddit/json/*/*.json /user/ubuntu
hive > CREATE TABLE reddit_json ( archived boolean, author string, author_flair_css_class string, author_flair_text string, body string, controversiality int, created_utc string, distinguished string, downs int, edited boolean, gilded int, id string, link_id string, name string, parent_id string, removal_reason string, retrieved_on timestamp, score int, score_hidden boolean, subreddit string, subreddit_id string, ups int ) ROW FORMAT serde 'com.amazon.elasticmapreduce.JsonSerde' with serdeproperties ('paths'='archived,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,downs,edited,gilded,id,link_id,name,parent_id,removal_reason,retrieved_on,score,score_hidden,subreddit,subreddit_id,ups');
hive > LOAD DATA INPATH '/user/ubuntu/*.json' INTO TABLE reddit_json;
hive > CREATE TABLE reddit_parquet ( archived boolean, author string, author_flair_css_class string, author_flair_text string, body string, controversiality int, created_utc string, distinguished string, downs int, edited boolean, gilded int, id string, link_id string, name string, parent_id string, removal_reason string, retrieved_on timestamp, score int, score_hidden boolean, subreddit string, subreddit_id string, ups int, created_utc_t timestamp ) STORED AS PARQUET;
hive > SET dfs.block.size=1g; hive > INSERT OVERWRITE TABLE reddit_parquet select *, cast(cast(created_utc as double) as timestamp) as created_utc_t FROM reddit_json;
SQL:
NoSQL:
Projects:
An interface to query data on different storage systems
Code at https://github.com/blaze/blaze
Blaze ecosystem: http://blaze.pydata.org
In [1]:
import blaze as bz
import pandas as pd
In [2]:
data = bz.Data('impala://54.209.0.148/default::reddit_parquet')
In [3]:
data.schema
Out[3]:
In [4]:
data.id.count()
Out[4]:
In [5]:
print(bz.compute(data.id.count()))
In [6]:
n_up_votes = data.ups.sum()
In [7]:
print(bz.compute(n_up_votes))
In [8]:
%time int(n_up_votes)
Out[8]:
In [9]:
n_posts_in_r_soccer = data[data.subreddit == 'soccer'].id.count()
In [10]:
print(bz.compute(n_posts_in_r_soccer))
In [11]:
%time int(n_posts_in_r_soccer)
Out[11]:
In [12]:
before_1pm = data.id[bz.hour(data.created_utc_t) < 13].count()
In [13]:
print(bz.compute(before_1pm))
In [14]:
%time int(before_1pm)
Out[14]:
In [15]:
iama = data[(data.subreddit == 'IAmA')]
In [16]:
days = (bz.year(iama.created_utc_t) - 2007) * 365 + (bz.month(iama.created_utc_t) - 1) * 31 + bz.day(iama.created_utc_t)
In [17]:
iama_with_day = bz.transform(iama, day=days)
In [18]:
by_day = bz.by(iama_with_day.day, posts=iama_with_day.created_utc_t.count())
Pandas
In [19]:
by_day_result = bz.odo(by_day, pd.DataFrame) # Actually triggers the computation
In [20]:
by_day_result.head()
Out[20]:
In [21]:
by_day_result = by_day_result.sort_values(by=['day'])
In [22]:
rng = pd.date_range('5/28/2009', periods=len(by_day_result), freq='D')
by_day_result.index = rng
In [23]:
from bokeh._legacy_charts import TimeSeries, output_notebook, show
In [24]:
output_notebook()
In [26]:
f = TimeSeries(by_day_result.posts, by_day_result.index,
title='Comments in /r/IAmA subreddit',
xlabel='Date', ylabel='Comments',
tools='pan,reset,save,box_zoom',
width=600, height=500)
show(f)
Ibis is a new Python data analysis framework with the goal of enabling data scientists and data engineers to be as productive working with big data as they are working with small and medium data today. In doing so, we will enable Python to become a true first-class language for Apache Hadoop, without compromises in functionality, usability, or performance
Code at:
More info: http://www.ibis-project.org
In [1]:
import ibis
from ibis.impala.compiler import to_sql
import pandas as pd
In [2]:
ibis.options.interactive = True
ibis.options.sql.default_limit = 20000
In [3]:
hdfs = ibis.hdfs_connect(host='52.91.39.64')
con = ibis.impala.connect(host='54.208.255.126', hdfs_client=hdfs)
In [4]:
data = con.table('reddit_parquet')
In [5]:
data.schema()
Out[5]:
In [6]:
more_than_1k = data[data.ups >= 1000]
In [7]:
month = (more_than_1k.created_utc_t.year() - 2007) * 12 + more_than_1k.created_utc_t.month()
month = month.name('month')
In [8]:
with_month = more_than_1k['id', month]
In [9]:
posts = with_month.count()
groups = with_month.aggregate([posts], by='month')
In [10]:
month_df = groups.execute()
Pandas
In [11]:
month_df = month_df.set_index('month')
month_df.sort_index(inplace=True)
rng = pd.date_range('10/01/2007', periods=len(month_df), freq='M')
month_df.index = rng
In [12]:
from bokeh._legacy_charts import TimeSeries, output_notebook, show
output_notebook()
In [14]:
f = TimeSeries(month_df['count'], month_df.index,
title='# posts with more than 1k up votes',
xlabel='Date', ylabel='Comments',
tools='pan,reset,save,box_zoom',
width=600, height=500)
show(f)
Data is on HDFS in Parquet, Spark is happy with that
In [ ]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [ ]:
# Read in the Parquet file
parquetFile = sqlContext.read.parquet("people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
# These are spark DataFrames so you can do other stuff like map
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
Taken from: http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files