<img src="images/continuum_analytics_logo.png" alt="Continuum Logo", align="right", width="30%">,

Remote Data

We've seen how into and blaze provide intuitive access to various forms of local data and computation. Now we extend these to large remote systems.

  • into will help us move data between local, remote, and HDFS locations as well as help us to register that data with the Hive metastore
  • blaze will help us query those same computational systems and bring results back for local analysis

Cluster on EC2

You have access to a small cluster on EC2. It consists of a few individual machines connected with HDFS. The cluster also runs various computational services like Hive, Impala, and Spark.

Data can live in a few places

  • Locally (myfile.csv) on the notebook computer in front of you
  • Remotely (ssh://hostname:myfile.csv) on the file system of one of the remote machines
  • On HDFS (hdfs://hostname:myfile.csv) on the parallel file system shared by all of the machines
  • Registered in the Hive Metastore (hive://hostname::tablename) as a SQL table

Traditionally we interact with each of these systems through different means. Individually each provides a simple interface but taken as a collection the burden on new developers can be significant.

Hostnames:

  • 54.159.103.12 - Hive, Spark, HDFS
  • 54.145.126.122 - Impala

In [ ]:
auth = {
  'username': '',
  'password': ''
}

into

We connect to the individual machines through ssh using the paramiko library and to HDFS as a whole through WebHDFS using pywebhdfs.

conda install paramiko
pip install pywebhdfs

We pull down a remote csv file into a Pandas DataFrame and push it up to HDFS as a JSON file.

Warning: You are all on the same cluster sharing the same resources. You are likely to overwrite each others' work. Nothing that you put on the cluster is secure.


In [ ]:
import pandas as pd
from into import into

# Load data from ssh://54.159.103.12:data/iris.csv into a DataFrame
# Use the auth dictionary above to supply authentication

into(..., ..., **auth)

In [ ]:
# Modify that data somehow (or just leave it alone) and upload your results to HDFS 
# hdfs://54.159.103.12:your-filename.json

into(..., ..., **auth)

blaze

We now interact with common HDFS database technologies. Part of this involves using the Hive Metastore. Hive has some overhead and was not really intended for rapid, interactive feedback. If we all start hammering the Hive server it might become unhappy. The tutorial leader should have done one of the following:

into('hive://hostname/default::tablename', 'local-file.csv')
into('hive://hostname/default::tablename', 'ssh://hostname:remote-file.csv')
into('hive://hostname/default::tablename', 'hdfs://hostname:remote-directory/*.csv')

You now have the opportunity to play with this data (and others) using your choice of Database.

You should quickly verify that you have the following libraries installed

  • For Impala: impyla

      pip install -U impyla 
  • For Hive: pyhive

      conda install -c blaze pyhive  # Mac/Linux
      Instructions at https://github.com/dropbox/PyHive  # Windows
  • For Redshift: redshift-sqlalchemy

      pip install redshift-sqlalchemy

These are trivially pip installable. (TODO: check this)

Loading data into Hive

We uploaded the NYCTaxiCab dataset to Hive in the following way

(Please, don't everyone do this)

In [1]: from into import into

In [2]: into('hive://54.159.103.12::taxi', 'ssh://54.159.103.12:/mnt/all.csv', **auth)

INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_6.6.1p1)
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (publickey) failed.
INFO:paramiko.transport:Authentication (password) successful!
INFO:paramiko.transport.sftp:[chan 0] Opened sftp connection (server version 3)
INFO:pyhive.hive:USE `default`
INFO:pyhive.hive:SHOW TABLES
INFO:pyhive.hive:DESCRIBE foo
INFO:pyhive.hive:DESCRIBE foo
INFO:pyhive.hive:        CREATE  TABLE default.taxi (
            medallion  STRING,
            hack_license  STRING,
               vendor_id  STRING,
               rate_code  BIGINT,
      store_and_fwd_flag  STRING,
         pickup_datetime  TIMESTAMP,
        dropoff_datetime  TIMESTAMP,
         passenger_count  BIGINT,
       trip_time_in_secs  BIGINT,
           trip_distance  DOUBLE,
        pickup_longitude  DOUBLE,
         pickup_latitude  DOUBLE,
       dropoff_longitude  DOUBLE,
        dropoff_latitude  DOUBLE,
            tolls_amount  DOUBLE,
              tip_amount  DOUBLE,
            total_amount  DOUBLE,
                 mta_tax  DOUBLE,
             fare_amount  DOUBLE,
            payment_type  STRING,
               surcharge  DOUBLE
        )
        ROW FORMAT DELIMITED
            FIELDS TERMINATED BY ','
        STORED AS TEXTFILE

        TBLPROPERTIES ("skip.header.line.count"="1")
INFO:pyhive.hive:DESCRIBE taxi
INFO:pyhive.hive:DESCRIBE taxi
INFO:pyhive.hive:LOAD DATA LOCAL INPATH "/mnt/all.csv" INTO TABLE taxi

Out[2]: Table('taxi', MetaData(bind=Engine(hive://hdfs@54.159.103.12:10000/default)), Column('medallion', String(), table=<taxi>), Column('hack_license', String(), table=<taxi>), Column('vendor_id', String(), table=<taxi>), Column('rate_code', BigInteger(), table=<taxi>), Column('store_and_fwd_flag', String(), table=<taxi>), Column('pickup_datetime', HiveTimestamp(), table=<taxi>), Column('dropoff_datetime', HiveTimestamp(), table=<taxi>), Column('passenger_count', BigInteger(), table=<taxi>), Column('trip_time_in_secs', BigInteger(), table=<taxi>), Column('trip_distance', Float(), table=<taxi>), Column('pickup_longitude', Float(), table=<taxi>), Column('pickup_latitude', Float(), table=<taxi>), Column('dropoff_longitude', Float(), table=<taxi>), Column('dropoff_latitude', Float(), table=<taxi>), Column('tolls_amount', Float(), table=<taxi>), Column('tip_amount', Float(), table=<taxi>), Column('total_amount', Float(), table=<taxi>), Column('mta_tax', Float(), table=<taxi>), Column('fare_amount', Float(), table=<taxi>), Column('payment_type', String(), table=<taxi>), Column('surcharge', Float(), table=<taxi>), schema=None)

In [ ]:
# Hive - needs PyHive  (conda install -c blaze PyHive # if Linux/Max)
import blaze as bz

hive = bz.Data('hive://54.159.103.12')
hive.dshape

In [ ]:


In [ ]:
# Impala - needs impyla  (pip install -U impyla)
imp = bz.Data('impala://54.145.126.122')
imp.dshape

In [ ]:


In [ ]:
# Postgres - needs psycopg2  (conda install psycopg2)
pg = bz.Data('postgresql://postgres:postgres@ec2-54-159-160-163.compute-1.amazonaws.com')
pg.dshape

In [ ]:


In [ ]:
# Redshift - needs ssh tunneling
red = bz.Data('postgresql://cio:Foobar23@localhost:5439/dev')
red.dshape

Interact with the rest of the ecosystem

Because Blaze expressions implement standard Python protocols like __iter__ and __array__ they can sometimes interact with the rest of the ecosystem without thought.


In [ ]:
from matplotlib import pyplot as plt
%matplotlib inline

taxi = imp.taxi[(imp.taxi.trip_distance > 0.1) & (imp.taxi.trip_distance < 100)]

query = bz.by(taxi.medallion, avg=taxi.trip_distance.mean())

plt.semilogy(query.avg.sort())