<img src="images/continuum_analytics_logo.png" alt="Continuum Logo", align="right", width="30%">,
We've seen how into
and blaze
provide intuitive access to various forms of local data and computation. Now we extend these to large remote systems.
into
will help us move data between local, remote, and HDFS locations as well as help us to register that data with the Hive metastoreblaze
will help us query those same computational systems and bring results back for local analysisYou have access to a small cluster on EC2. It consists of a few individual machines connected with HDFS. The cluster also runs various computational services like Hive, Impala, and Spark.
Data can live in a few places
myfile.csv
) on the notebook computer in front of youssh://hostname:myfile.csv
) on the file system of one of the remote machines hdfs://hostname:myfile.csv
) on the parallel file system shared by all of the machineshive://hostname::tablename
) as a SQL tableTraditionally we interact with each of these systems through different means. Individually each provides a simple interface but taken as a collection the burden on new developers can be significant.
Hostnames:
54.159.103.12
- Hive, Spark, HDFS54.145.126.122
- Impala
In [ ]:
auth = {
'username': '',
'password': ''
}
We pull down a remote csv file into a Pandas DataFrame and push it up to HDFS as a JSON file.
Warning: You are all on the same cluster sharing the same resources. You are likely to overwrite each others' work. Nothing that you put on the cluster is secure.
In [ ]:
import pandas as pd
from into import into
# Load data from ssh://54.159.103.12:data/iris.csv into a DataFrame
# Use the auth dictionary above to supply authentication
into(..., ..., **auth)
In [ ]:
# Modify that data somehow (or just leave it alone) and upload your results to HDFS
# hdfs://54.159.103.12:your-filename.json
into(..., ..., **auth)
blaze
We now interact with common HDFS database technologies. Part of this involves using the Hive Metastore. Hive has some overhead and was not really intended for rapid, interactive feedback. If we all start hammering the Hive server it might become unhappy. The tutorial leader should have done one of the following:
into('hive://hostname/default::tablename', 'local-file.csv')
into('hive://hostname/default::tablename', 'ssh://hostname:remote-file.csv')
into('hive://hostname/default::tablename', 'hdfs://hostname:remote-directory/*.csv')
You now have the opportunity to play with this data (and others) using your choice of Database.
You should quickly verify that you have the following libraries installed
For Impala: impyla
pip install -U impyla
For Hive: pyhive
conda install -c blaze pyhive # Mac/Linux
Instructions at https://github.com/dropbox/PyHive # Windows
For Redshift: redshift-sqlalchemy
pip install redshift-sqlalchemy
These are trivially pip installable. (TODO: check this)
We uploaded the NYCTaxiCab dataset to Hive in the following way
(Please, don't everyone do this)
In [1]: from into import into
In [2]: into('hive://54.159.103.12::taxi', 'ssh://54.159.103.12:/mnt/all.csv', **auth)
INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_6.6.1p1)
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (password) successful!
INFO:paramiko.transport.sftp:[chan 0] Opened sftp connection (server version 3)
INFO:pyhive.hive:USE `default`
INFO:pyhive.hive:SHOW TABLES
INFO:pyhive.hive:DESCRIBE foo
INFO:pyhive.hive:DESCRIBE foo
INFO:pyhive.hive: CREATE TABLE default.taxi (
medallion STRING,
hack_license STRING,
vendor_id STRING,
rate_code BIGINT,
store_and_fwd_flag STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
passenger_count BIGINT,
trip_time_in_secs BIGINT,
trip_distance DOUBLE,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
tolls_amount DOUBLE,
tip_amount DOUBLE,
total_amount DOUBLE,
mta_tax DOUBLE,
fare_amount DOUBLE,
payment_type STRING,
surcharge DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
TBLPROPERTIES ("skip.header.line.count"="1")
INFO:pyhive.hive:DESCRIBE taxi
INFO:pyhive.hive:DESCRIBE taxi
INFO:pyhive.hive:LOAD DATA LOCAL INPATH "/mnt/all.csv" INTO TABLE taxi
Out[2]: Table('taxi', MetaData(bind=Engine(hive://hdfs@54.159.103.12:10000/default)), Column('medallion', String(), table=<taxi>), Column('hack_license', String(), table=<taxi>), Column('vendor_id', String(), table=<taxi>), Column('rate_code', BigInteger(), table=<taxi>), Column('store_and_fwd_flag', String(), table=<taxi>), Column('pickup_datetime', HiveTimestamp(), table=<taxi>), Column('dropoff_datetime', HiveTimestamp(), table=<taxi>), Column('passenger_count', BigInteger(), table=<taxi>), Column('trip_time_in_secs', BigInteger(), table=<taxi>), Column('trip_distance', Float(), table=<taxi>), Column('pickup_longitude', Float(), table=<taxi>), Column('pickup_latitude', Float(), table=<taxi>), Column('dropoff_longitude', Float(), table=<taxi>), Column('dropoff_latitude', Float(), table=<taxi>), Column('tolls_amount', Float(), table=<taxi>), Column('tip_amount', Float(), table=<taxi>), Column('total_amount', Float(), table=<taxi>), Column('mta_tax', Float(), table=<taxi>), Column('fare_amount', Float(), table=<taxi>), Column('payment_type', String(), table=<taxi>), Column('surcharge', Float(), table=<taxi>), schema=None)
In [ ]:
# Hive - needs PyHive (conda install -c blaze PyHive # if Linux/Max)
import blaze as bz
hive = bz.Data('hive://54.159.103.12')
hive.dshape
In [ ]:
In [ ]:
# Impala - needs impyla (pip install -U impyla)
imp = bz.Data('impala://54.145.126.122')
imp.dshape
In [ ]:
In [ ]:
# Postgres - needs psycopg2 (conda install psycopg2)
pg = bz.Data('postgresql://postgres:postgres@ec2-54-159-160-163.compute-1.amazonaws.com')
pg.dshape
In [ ]:
In [ ]:
# Redshift - needs ssh tunneling
red = bz.Data('postgresql://cio:Foobar23@localhost:5439/dev')
red.dshape
In [ ]:
from matplotlib import pyplot as plt
%matplotlib inline
taxi = imp.taxi[(imp.taxi.trip_distance > 0.1) & (imp.taxi.trip_distance < 100)]
query = bz.by(taxi.medallion, avg=taxi.trip_distance.mean())
plt.semilogy(query.avg.sort())