Getting Started

This tutorial describes how to use Pandas-TD in Jupyter to explore data interactively.

Set your API key to the environment variable TD_API_KEY and run "jupyter notebook":

$ export TD_API_SERVER="https://api.treasuredata.com/"
$ export TD_API_KEY="1234/abcd..."
$ jupyter notebook

You can connect to your database by create_engine:


In [1]:
%matplotlib inline

import os
import pandas_td as td

# Set engine type and database, using the default connection
engine = td.create_engine('presto:sample_datasets')

# Alternatively, initialize a connection explicitly
con = td.connect(apikey=os.environ['TD_API_KEY'], endpoint=os.environ['TD_API_SERVER'])
engine = td.create_engine('presto:sample_datasets', con=con)

You can run a query by read_td:


In [2]:
query = '''
select * from nasdaq limit 3
'''
td.read_td(query, engine)


Out[2]:
symbol open volume high low close time
0 MOCO 0 33699 0.6673 0.4032 0.4032 311356800
1 NSEC 0 2902 3.5767 3.2989 3.2989 311356800
2 MYL 0 0 0.0000 0.0000 0.0464 311356800

Or you can read an existing job result by read_td_job:


In [3]:
td.read_td_job(35809747, engine)


Out[3]:
symbol open volume high low close time
0 AXTI 2.17 108675 2.2099 2.14 2.17 1399564800
1 APEI 33.90 144500 35.0000 33.18 33.43 1399564800
2 ARQL 1.42 671424 1.4900 1.40 1.46 1399564800

You can read a table into a DataFrame by read_td_table, optionally with specific time range and limit:


In [4]:
# Read from a table with time range
df = td.read_td_table('nasdaq', engine,
                      time_range=('2000-01-01', '2010-01-01'),
                      limit=10000)

Importing a DataFrame into a table is also supported by to_td:


In [5]:
# Create a DataFrame with random values
df = pd.DataFrame(np.random.rand(3, 3), columns=['x', 'y', 'z'])

# Import it into 'tutorial.import1'
con = td.connect()
td.to_td(df, 'tutorial.import1', con, if_exists='replace', index=False)

Note that to_td currently uses Streaming API for imports. It takes more than a few seconds before your data become visible. Be patient.


In [6]:
# Check the result
td.read_td_table('tutorial.import1', engine)


Out[6]:
z y x time
0 0.320127 0.395376 0.278403 1448174786
1 0.458859 0.880002 0.110724 1448174786
2 0.310495 0.052658 0.804152 1448174786

create_engine


In [7]:
help(td.create_engine)


Help on function create_engine in module pandas_td.td:

create_engine(url, con=None, header=True, show_progress=5.0, clear_progress=True)
    Create a handler for query engine based on a URL.
    
    The following environment variables are used for default connection:
    
      TD_API_KEY     API key
      TD_API_SERVER  API server (default: api.treasuredata.com)
      HTTP_PROXY     HTTP proxy (optional)
    
    Parameters
    ----------
    url : string
        Engine descriptor in the form "type://apikey@host/database?params..."
        Use shorthand notation "type:database?params..." for the default connection.
    con : Connection, optional
        Handler returned by connect. If not given, default connection is used.
    header : string or boolean, default True
        Prepend comment strings, in the form "-- comment", as a header of queries.
        Set False to disable header.
    show_progress : double or boolean, default 5.0
        Number of seconds to wait before printing progress.
        Set False to disable progress entirely.
    clear_progress : boolean, default True
        If True, clear progress when query completed.
    
    Returns
    -------
    QueryEngine

Examples


In [8]:
# presto
engine = td.create_engine('presto://APIKEY@api.treasuredata.com/sample_datasets')

# hive
engine = td.create_engine('hive://APIKEY@api.treasuredata.com/sample_datasets')

create_engine uses "default" connection if apikey and host are omitted. In this case, the environment variables "TD_API_KEY" and "TD_API_SERVER" are used to initialize a connection:


In [9]:
# use default connection (TD_API_KEY is used)
engine = td.create_engine('presto:sample_datasets')

If you prefer initializing a connection with custom parameters, you can use connect:


In [10]:
# create a connection with detailed parameters (via tdclient.Client)
# See https://github.com/treasure-data/td-client-python/blob/master/tdclient/api.py
con = td.connect(apikey=os.environ['TD_API_KEY'],
                 endpoint=os.environ['TD_API_SERVER'],
                 retry_post_requests=True)
engine = td.create_engine('presto:sample_datasets', con=con)

read_td_query


In [11]:
help(td.read_td_query)


Help on function read_td_query in module pandas_td.td:

read_td_query(query, engine, index_col=None, parse_dates=None, distributed_join=False, params=None)
    Read Treasure Data query into a DataFrame.
    
    Returns a DataFrame corresponding to the result set of the query string.
    Optionally provide an index_col parameter to use one of the columns as
    the index, otherwise default integer index will be used.
    
    Parameters
    ----------
    query : string
        Query string to be executed.
    engine : QueryEngine
        Handler returned by create_engine.
    index_col : string, optional
        Column name to use as index for the returned DataFrame object.
    parse_dates : list or dict, optional
        - List of column names to parse as dates
        - Dict of {column_name: format string} where format string is strftime
          compatible in case of parsing string times or is one of (D, s, ns, ms, us)
          in case of parsing integer timestamps
    distributed_join : boolean, default False
        (Presto only) If True, distributed join is enabled. If False, broadcast join is used.
        See https://prestodb.io/docs/current/release/release-0.77.html
    params : dict, optional
        Parameters to pass to execute method.
        Available parameters:
        - result_url (str): result output URL
        - priority (int or str): priority (e.g. "NORMAL", "HIGH", etc.)
        - retry_limit (int): retry limit
    
    Returns
    -------
    DataFrame

Examples


In [12]:
query = '''
select time, close from nasdaq where symbol='AAPL'
'''

# Run a query, converting "time" to a time series index
df = td.read_td_query(query, engine, index_col='time', parse_dates={'time': 's'})
df.plot()


Out[12]:
<matplotlib.axes._subplots.AxesSubplot at 0x106968e48>

read_td_job


In [13]:
help(td.read_td_job)


Help on function read_td_job in module pandas_td.td:

read_td_job(job_id, engine, index_col=None, parse_dates=None)
    Read Treasure Data job result into a DataFrame.
    
    Returns a DataFrame corresponding to the result set of the job.
    This method waits for job completion if the specified job is still running.
    Optionally provide an index_col parameter to use one of the columns as
    the index, otherwise default integer index will be used.
    
    Parameters
    ----------
    job_id : integer
        Job ID.
    engine : QueryEngine
        Handler returned by create_engine.
    index_col : string, optional
        Column name to use as index for the returned DataFrame object.
    parse_dates : list or dict, optional
        - List of column names to parse as dates
        - Dict of {column_name: format string} where format string is strftime
          compatible in case of parsing string times or is one of (D, s, ns, ms, us)
          in case of parsing integer timestamps
    
    Returns
    -------
    DataFrame

Examples


In [14]:
import tdclient

# Before using read_td_job, you need to issue a job separately
client = tdclient.Client()
job = client.query("sample_datasets",
                   "select time, close from nasdaq where symbol='AAPL'",
                   type="presto")

In [15]:
# Get result and convert it to dataframe
df = td.read_td_job(job.id, engine, index_col='time', parse_dates={'time': 's'})
df.plot()


Out[15]:
<matplotlib.axes._subplots.AxesSubplot at 0x10918d320>

read_td_table


In [16]:
help(td.read_td_table)


Help on function read_td_table in module pandas_td.td:

read_td_table(table_name, engine, index_col=None, parse_dates=None, columns=None, time_range=None, limit=10000)
    Read Treasure Data table into a DataFrame.
    
    The number of returned rows is limited by "limit" (default 10,000).
    Setting limit=None means all rows. Be careful when you set limit=None
    because your table might be very large and the result does not fit into memory.
    
    Parameters
    ----------
    table_name : string
        Name of Treasure Data table in database.
    engine : QueryEngine
        Handler returned by create_engine.
    index_col : string, optional
        Column name to use as index for the returned DataFrame object.
    parse_dates : list or dict, optional
        - List of column names to parse as dates
        - Dict of {column_name: format string} where format string is strftime
          compatible in case of parsing string times or is one of (D, s, ns, ms, us)
          in case of parsing integer timestamps
    columns : list, optional
        List of column names to select from table.
    time_range : tuple (start, end), optional
        Limit time range to select. "start" and "end" are one of None, integers,
        strings or datetime objects. "end" is exclusive, not included in the result.
    limit : int, default 10,000
        Maximum number of rows to select.
    
    Returns
    -------
    DataFrame

Examples


In [17]:
# Read all records (up to 10,000 rows by default)
df = td.read_td_table("www_access", engine)
df.head(3)


Out[17]:
user host path referer code agent size method time
0 None 116.93.24.135 /item/toys/3282 /search/?c=Jewelry+Toys 200 Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ... 111 GET 1412359189
1 None 40.186.149.189 /category/health /item/books/3216 200 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi... 48 GET 1412359176
2 None 228.195.75.58 /item/games/2968 - 200 Mozilla/4.0 (compatible; MSIE 8.0; Windows NT ... 86 GET 1412359162

to_td


In [20]:
help(td.to_td)


Help on function to_td in module pandas_td.td:

to_td(frame, name, con, if_exists='fail', time_col=None, time_index=None, index=True, index_label=None, chunksize=10000, date_format=None)
    Write a DataFrame to a Treasure Data table.
    
    This method converts the dataframe into a series of key-value pairs
    and send them using the Treasure Data streaming API. The data is divided
    into chunks of rows (default 10,000) and uploaded separately. If upload
    failed, the client retries the process for a certain amount of time
    (max_cumul_retry_delay; default 600 secs). This method may fail and
    raise an exception when retries did not success, in which case the data
    may be partially inserted. Use the bulk import utility if you cannot
    accept partial inserts.
    
    Parameters
    ----------
    frame : DataFrame
        DataFrame to be written.
    name : string
        Name of table to be written, in the form 'database.table'.
    con : Connection
        Connection to a Treasure Data account.
    if_exists: {'fail', 'replace', 'append'}, default 'fail'
        - fail: If table exists, do nothing.
        - replace: If table exists, drop it, recreate it, and insert data.
        - append: If table exists, insert data. Create if does not exist.
    time_col : string, optional
        Column name to use as "time" column for the table. Column type must be
        integer (unixtime), datetime, or string. If None is given (default),
        then the current time is used as time values.
    time_index : int, optional
        Level of index to use as "time" column for the table. Set 0 for a single index.
        This parameter implies index=False.
    index : boolean, default True
        Write DataFrame index as a column.
    index_label : string or sequence, default None
        Column label for index column(s). If None is given (default) and index is True,
        then the index names are used. A sequence should be given if the DataFrame uses
        MultiIndex.
    chunksize : int, default 10,000
        Number of rows to be inserted in each chunk from the dataframe.
    date_format : string, default None
        Format string for datetime objects

Examples


In [21]:
# Create DataFrame with random values
df = pd.DataFrame(np.random.rand(3, 3), columns=['x', 'y', 'z'])

to_td fails if table already exists:


In [22]:
td.to_td(df, 'tutorial.import1', con)


# session started at 2015-11-22T06:48:07Z
Streaming import into: tutorial.import1
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-22-4cab8eb5396a> in <module>()
----> 1 td.to_td(df, 'tutorial.import1', con)

/Users/knishida/projects/pandas-td/pandas_td/td.py in to_td(frame, name, con, if_exists, time_col, time_index, index, index_label, chunksize, date_format)
    689             con.client.create_log_table(database, table)
    690         else:
--> 691             raise RuntimeError('table "%s" already exists' % name)
    692     elif if_exists == 'replace':
    693         try:

RuntimeError: table "tutorial.import1" already exists

In [23]:
# Set "if_exists" to 'replace' or 'append'
td.to_td(df, 'tutorial.import1', con, if_exists='replace')

Use index=False if you don't need to insert DataFrame index:


In [24]:
td.to_td(df, 'tutorial.import1', con, if_exists='replace', index=False)

to_td inserts the current time as "time" column. You can pass "time" column explicitly by time_col:


In [25]:
import datetime

df = pd.DataFrame(np.random.rand(3, 3), columns=['x', 'y', 'z'])

# Set "time" column explicitly
df['time'] = datetime.datetime.now()

# Use "time" as the time column in Treasure Data
td.to_td(df, 'tutorial.import1', con, if_exists='replace', index=False, time_col='time')

If you are using a time series index, set time_index=0:


In [26]:
df = pd.DataFrame(np.random.rand(3, 3), columns=['x', 'y', 'z'])

# Set time series index
df.index = pd.date_range('2001-01-01', periods=3)

# Use index as the time column in Treasure Data
td.to_td(df, 'tutorial.import1', con, if_exists='replace', index=False, time_index=0)

In [ ]: