Building a Jupyter + Redis Data Pipeline

This extends the SPY pricing demo (example-spy-downloader.ipynb) and publishes + retreives the pricing data by using a targeted CACHE redis server (that runs inside the Jupyter container). It stores the Pandas dataframe as JSON in the LATEST_SPY_DAILY_STICKS redis key.


In [ ]:
import os, sys, json

sys.path.insert(0, os.getenv('ENV_PYTHON_SRC_DIR', '/opt/work/src'))

from pycore import PyCore

# Specify to connect the core to the redis server
# running inside the container and listening on port 6000
os.environ['ENV_DEPLOYMENT_TYPE'] = 'JustRedis'

print 'Initializing Python Core'

core = PyCore()
core.lg('')

# the downloader is hardcoded for now to download to this shared volume + file location:
csv_file = '/opt/work/data/src/spy.csv'

# removing previous csv file if it exists
if os.path.exists(csv_file):
    core.lg(' - Removing Previous(' + str(csv_file) + ')', 4)
    os.system('rm -f ' + str(csv_file) + ')')
# end of removing previous

downloader_name    = 'download-spy-csv.py'
path_to_downloader = os.getenv('ENV_BINS', '/opt/work/bins/') + '/' + downloader_name

core.lg('Downloading latest SPY Pricing with Download(' + str(path_to_downloader) + ')', 1)
os.system(path_to_downloader)

core.lg('')
core.lg('Checking CSV is Ready')
if os.path.exists(csv_file):
    core.lg('  SUCCESS - File Exists: ' + str(csv_file), 5)
    core.lg('')
    
    # Start loading the data
    import numpy as np
    import matplotlib.pyplot as plt
    import pandas as pd
    
    %matplotlib inline
    import numpy as np
    core.lg('Reading CSV with Pandas')
    # handle date formats and the special tab-character on the header row with utf-8-sig
    dateparse = lambda x: pd.datetime.strptime(x, '%d-%b-%y')
    data = pd.read_csv(csv_file, parse_dates=[0], date_parser=dateparse, encoding='utf-8-sig').sort_values(by='Date', ascending=False)
    
    core.lg('')
    core.lg('Data Head with Sticks(' + str(len(data)) + ')', 1)
    print data.head()
    
    core.lg('')
    core.lg('-------------------------------------------------', 2)
    core.lg('')
    
    # connect the core to the redis cache if it exists
    cache_key = 'LATEST_SPY_DAILY_STICKS'
    core.lg('Converting to JSON')
    json_data_rec = data.to_json()
    cache_this    = {
                      'Ticker' : 'SPY',
                      'Data'   : json_data_rec
                  }
    core.lg('Caching the Latest SPY Candlesticks')
    results  = core.purge_and_cache_records_in_redis(core.m_rds['CACHE'], cache_key, cache_this)

    core.lg('')
    core.lg('-------------------------------------------------', 2)
    core.lg('')
    
    # confirm we can retrieve + remove the cached data from this key:
    core.lg('Retrieving Cached SPY Candlesticks')
    cache_record  = core.get_message_no_block(core.m_rds['CACHE'], cache_key)
    core.lg('')

    # the core will return a dictionary where the 'Status' == 'SUCCESS' when it was able to pull records
    # records out of redis. After checking the 'Status', the dataset is stored under the 'Record' key.
    if cache_record['Status'] == 'SUCCESS':
        rec        = cache_record['Record']
        cache_data = rec['Data']
        sticks     = pd.read_json(cache_data).sort_values(by='Date', ascending=False)
        core.lg(' - SUCCESS found cached records for Ticker(' + str(rec['Ticker']) + ')', 5)
        core.lg('')
        core.lg(' - Data Head with Sticks(' + str(len(sticks)) + ')')
        print sticks.head()
    else:
        core.lg('ERROR: Failed to retreive Cached Candlesticks', 0)
    # end of retrieving cache example

    core.lg('')
else:
    core.lg('  ERROR: Failed to find CSV(' + str(csv_file) + ')', 0)
# end of if/else download was successful