This extends the SPY pricing demo (example-spy-downloader.ipynb) and publishes + retreives the pricing data by using a targeted CACHE redis server (that runs inside the Jupyter container). It stores the Pandas dataframe as JSON in the LATEST_SPY_DAILY_STICKS redis key.
In [ ]:
import os, sys, json
sys.path.insert(0, os.getenv('ENV_PYTHON_SRC_DIR', '/opt/work/src'))
from pycore import PyCore
# Specify to connect the core to the redis server
# running inside the container and listening on port 6000
os.environ['ENV_DEPLOYMENT_TYPE'] = 'JustRedis'
print 'Initializing Python Core'
core = PyCore()
core.lg('')
# the downloader is hardcoded for now to download to this shared volume + file location:
csv_file = '/opt/work/data/src/spy.csv'
# removing previous csv file if it exists
if os.path.exists(csv_file):
core.lg(' - Removing Previous(' + str(csv_file) + ')', 4)
os.system('rm -f ' + str(csv_file) + ')')
# end of removing previous
downloader_name = 'download-spy-csv.py'
path_to_downloader = os.getenv('ENV_BINS', '/opt/work/bins/') + '/' + downloader_name
core.lg('Downloading latest SPY Pricing with Download(' + str(path_to_downloader) + ')', 1)
os.system(path_to_downloader)
core.lg('')
core.lg('Checking CSV is Ready')
if os.path.exists(csv_file):
core.lg(' SUCCESS - File Exists: ' + str(csv_file), 5)
core.lg('')
# Start loading the data
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline
import numpy as np
core.lg('Reading CSV with Pandas')
# handle date formats and the special tab-character on the header row with utf-8-sig
dateparse = lambda x: pd.datetime.strptime(x, '%d-%b-%y')
data = pd.read_csv(csv_file, parse_dates=[0], date_parser=dateparse, encoding='utf-8-sig').sort_values(by='Date', ascending=False)
core.lg('')
core.lg('Data Head with Sticks(' + str(len(data)) + ')', 1)
print data.head()
core.lg('')
core.lg('-------------------------------------------------', 2)
core.lg('')
# connect the core to the redis cache if it exists
cache_key = 'LATEST_SPY_DAILY_STICKS'
core.lg('Converting to JSON')
json_data_rec = data.to_json()
cache_this = {
'Ticker' : 'SPY',
'Data' : json_data_rec
}
core.lg('Caching the Latest SPY Candlesticks')
results = core.purge_and_cache_records_in_redis(core.m_rds['CACHE'], cache_key, cache_this)
core.lg('')
core.lg('-------------------------------------------------', 2)
core.lg('')
# confirm we can retrieve + remove the cached data from this key:
core.lg('Retrieving Cached SPY Candlesticks')
cache_record = core.get_message_no_block(core.m_rds['CACHE'], cache_key)
core.lg('')
# the core will return a dictionary where the 'Status' == 'SUCCESS' when it was able to pull records
# records out of redis. After checking the 'Status', the dataset is stored under the 'Record' key.
if cache_record['Status'] == 'SUCCESS':
rec = cache_record['Record']
cache_data = rec['Data']
sticks = pd.read_json(cache_data).sort_values(by='Date', ascending=False)
core.lg(' - SUCCESS found cached records for Ticker(' + str(rec['Ticker']) + ')', 5)
core.lg('')
core.lg(' - Data Head with Sticks(' + str(len(sticks)) + ')')
print sticks.head()
else:
core.lg('ERROR: Failed to retreive Cached Candlesticks', 0)
# end of retrieving cache example
core.lg('')
else:
core.lg(' ERROR: Failed to find CSV(' + str(csv_file) + ')', 0)
# end of if/else download was successful