I wanted to use machine learning to accurately predict buy and sell opportunities for stocks.
While building the original version, I realized how important having a machine learning data store is to this type of intelligent application stack. Today's talk is about using Redis as a database for 1000s of machine learning models. By using Redis as the data store, it allows my team to focus on making better predictions through iterative improvements in an ever-changing market.
Once the API was defined, I realized this approach would work for any dataset...not just playing stocks. (Please see Appendix D for how Red10 works with the IRIS dataset)
An algorithm is the general approach you will take. The model is what you get when you run the algorithm over your training data and what you use to make predictions on new data. You can generate a new model with the same algorithm with different data, or a different model from the same data with a different algorithm.
Source: https://www.quora.com/What-is-the-difference-between-machine-learning-model-and-ML-algorithm
Please refer to Appendix G for more defintions.
Redis is a great scalable, in-memory storage solution for handling CRUD machine learning use cases.
After using Redis for years to handle: caching, pub/sub and auto-reloading capabilities on restart, it was an obvious first choice as a scalable storage solution for many pre-trained machine learning models. In my humble opinion, pulling gigabytes of pickled objects from a database would take too long and is not an ideal use case for a relational or nosql database (mysql/postgres/oracle/mongo).
In [1]:
from __future__ import print_function
import sys, os, requests, json, datetime
# Load the environment and login the user
from src.common.load_redten_ipython_env import user_token, user_login, csv_file, run_job, core, api_urls, ppj, rt_url, rt_user, rt_pass, rt_email, lg, good, boom, anmt, mark, ppj, uni_key, rest_login_as_user, rest_full_login, wait_for_job_to_finish, wait_on_job, get_job_analysis, get_job_results, get_analysis_manifest, get_job_cache_manifest, build_prediction_results, build_forecast_results, get_job_cache_manifest, search_ml_jobs, show_logs, show_errors, ipyImage, ipyHTML, ipyDisplay, pd, np
# header
lg("")
good("Starting Redis Key Analysis: " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
lg("")
# store the model mappings by dataset name
models_iris = {}
models_spy = {}
models_xle = {}
models_xlf = {}
models_xli = {}
models_xlu = {}
total_models_in_redis = 0
# walk the redis dbs
for db_idx in range(0, 16):
db_file = "/tmp/db-" + str(db_idx) + "-keys"
os.system("echo \"select " + str(db_idx) + " \n keys *\" | redis-cli -p 6100 > " + str(db_file))
database_keys = {}
if os.path.exists(db_file): # if the file exists
with open(db_file) as f: # open it
lines_to_parse = f.readlines() # read all the lines
for org_line in lines_to_parse:
cur_line = org_line.rstrip("\n").strip().lstrip() # remove the newline and any other whitespace
if str(cur_line) != "OK" and str(cur_line) != "":
database_keys[cur_line] = True
# if it's not the redis OK response or an empty string
# for all lines
# with the db file open
else:
boom("Failed parsing Redis db_file=" + str(db_file))
# end of parsing db_file
if len(database_keys) > 1:
for idx,k in enumerate(database_keys):
# ignore the predictions and accuracies
if "_PredictionsDF" not in k and "_Accuracy" not in k:
num_underscores = len(str(k).split("_"))
# ignore the analysis/manifest keys
if num_underscores > 2:
if "_IRIS" in k:
models_iris[k] = True
total_models_in_redis += 1
elif "_SPY" in k:
models_spy[k] = True
total_models_in_redis += 1
elif "_XLE" in k:
models_xle[k] = True
total_models_in_redis += 1
elif "_XLF" in k:
models_xlf[k] = True
total_models_in_redis += 1
elif "_XLI" in k:
models_xli[k] = True
total_models_in_redis += 1
elif "_XLU" in k:
models_xlu[k] = True
total_models_in_redis += 1
# end of checking it's not an analysis/manifest key
# end of if it's not a prediction and not an accuarcy key
# for the large db keyset
lg("IRIS models=" + str(len(models_iris)), 5)
lg("SPY models=" + str(len(models_spy)), 5)
lg("XLE models=" + str(len(models_xle)), 5)
lg("XLF models=" + str(len(models_xlf)), 5)
lg("XLI models=" + str(len(models_xli)), 5)
lg("XLU models=" + str(len(models_xlu)), 5)
lg("")
lg("---------------------------------------------")
anmt("Total Pre-trained Machine Learning Models in Redis:")
boom(str(total_models_in_redis))
lg("---------------------------------------------")
lg("")
# end of if there's database keys in the redis instance
# end for all db files
Additional reading
definitely a data problem...
Who wins the Kaggle competitions? eXtreme gradient boosting (XGB) won a bunch in 2016
Many, many more choices
Start iterating in a notebook http://nbviewer.jupyter.org/github/jmsteinw/Notebooks/blob/master/XG_Boost_NB.ipynb
def __init__(self, max_depth=3, learning_rate=0.1, n_estimators=100,
silent=True, objective="reg:linear",
nthread=-1, gamma=0, min_child_weight=1, max_delta_step=0,
subsample=1, colsample_bytree=1, colsample_bylevel=1,
reg_alpha=0, reg_lambda=1, scale_pos_weight=1,
base_score=0.5, seed=0, missing=None):
https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/sklearn.py
Please refer to Appendix A for more details on Red10's architecture.
The github repo: https://github.com/jay-johnson/sci-pype is built for using this workflow:
Using Red10 for price forecasting:
Please refer to the appendices for architecture slides and developer-centric tooling for reviewing offline.
In [2]:
from __future__ import print_function
import sys, os, requests, json, datetime
# Load the environment and login the user
from src.common.load_redten_ipython_env import user_token, user_login, csv_file, run_job, core, api_urls, ppj, rt_url, rt_user, rt_pass, rt_email, lg, good, boom, anmt, mark, ppj, uni_key, rest_login_as_user, rest_full_login, wait_for_job_to_finish, wait_on_job, get_job_analysis, get_job_results, get_analysis_manifest, get_job_cache_manifest, build_prediction_results, build_forecast_results, get_job_cache_manifest, search_ml_jobs, show_logs, show_errors, ipyImage, ipyHTML, ipyDisplay, pd, np
# header
lg("")
good("Starting Redis Key Analysis: " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
lg("")
# walk the redis dbs
for db_idx in range(0, 16):
db_file = "/tmp/db-" + str(db_idx) + "-keys"
os.system("echo \"select " + str(db_idx) + " \n keys *\" | redis-cli > " + str(db_file))
database_keys = {}
if os.path.exists(db_file): # if the file exists
with open(db_file) as f: # open it
lines_to_parse = f.readlines() # read all the lines
for org_line in lines_to_parse:
cur_line = org_line.rstrip("\n").strip().lstrip() # remove the newline and any other whitespace
if str(cur_line) != "OK" and str(cur_line) != "":
database_keys[cur_line] = True
# if it's not the redis OK response
# for all lines
# with the db file open
else:
boom("Failed parsing Redis db_file=" + str(db_file))
# end of parsing db_file
anmt("Redis DB=" + str(db_idx) + " keys=" + str(len(database_keys)))
# used for manual inspection
sample_cache_record = {}
if db_idx == 0:
equity_ticker = "SPY"
num_equities = 0
detailed_keys = []
for idx,k in enumerate(database_keys):
if "EQTY_" in str(k) or "EQID_" in str(k):
num_equities += 1
else:
lg(" - key=" + str(idx) + " name: " + str(k))
detailed_keys.append(k)
# Pull this record from the "STOCKS:EQ_DAILY_SPY"
# redis location => <cache app name:redis key>
if "EQ_DAILY_SPY" == k:
sample_cache_record = core.get_cache_from_redis(core.m_rds["STOCKS"], k, False, False)
# end of pulling a sample
# for the large db keyset
if len(sample_cache_record) > 0:
lg("")
lg("Daily Sticks for Ticker(" + str(equity_ticker) + ") StartDate(" + str(sample_cache_record["Record"]["StartDate"]) + ") Sticks(" + str(len(sample_cache_record["Record"]["Sticks"])) + ")", 6)
lg("Date, High, Low, Open, Close, Volume", 5)
for idx,record in enumerate(reversed(sample_cache_record["Record"]["Sticks"])):
lg(record["Date"] + ", " + record["High"] + ", " + record["Low"] + ", " + record["Open"] + ", " + record["Close"] + ", " + record["Volume"])
# stop after a few
if idx > 10:
break
# end for all sticks in the cache
lg("")
# end of inspecting the sample record
lg("DB=" + str(db_idx) + " detailed_keys=" + str(len(detailed_keys)) + " equities=" + str(num_equities))
else:
for k in database_keys:
if "session:" in k:
lg(" - key: session:<redacted>")
else:
lg(" - key: " + str(k))
# end of for all keys
# end of for the post processing keyset in this redis db
lg("---------------------------------------------")
lg("")
# end for all db files
In [3]:
from __future__ import print_function
import sys, os, requests, json, datetime
# Load the environment and login the user
from src.common.load_redten_ipython_env import user_token, user_login, csv_file, run_job, core, api_urls, ppj, rt_url, rt_user, rt_pass, rt_email, lg, good, boom, anmt, mark, ppj, uni_key, rest_login_as_user, rest_full_login, wait_for_job_to_finish, wait_on_job, get_job_analysis, get_job_results, get_analysis_manifest, get_job_cache_manifest, build_prediction_results, build_forecast_results, get_job_cache_manifest, search_ml_jobs, show_logs, show_errors, ipyImage, ipyHTML, ipyDisplay, pd, np
search_req = {
"title" : "", # job title with completion
"dsname" : "SPY", # dataset name with completion
"desc" : "", # description with completion
"features" : "", # feature search
"target_column" : "" # name of target column for this analysis
}
job_search = {}
job_res = {}
if len(search_req) == 0 :
boom("Please create a valid search request")
else:
job_res = search_ml_jobs(search_req)
if job_res["status"] != "SUCCESS":
boom("Job=" + str(job_id) + " failed with status=" + str(job_res["status"]) + " err=" + str(job_res["error"]))
else:
job_search = job_res["record"]
anmt("Job Matches=" + str(len(job_search)))
lg(ppj(job_search), 5)
# found jobs
# end of searching for job
In [4]:
boom("Finding latest error logs:")
show_errors(limit=50)
In [5]:
anmt("Finding latest logs:")
show_logs(limit=50)