This notebook is from the official Quantopian Guide on Pipelines. Make sure to visit their documentation for many more great resources!
Many trading algorithms have the following structure:
There are several technical challenges with doing this robustly. These include:
Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.
A factor is a function from an asset and a moment in time to a numerical value.
A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:
A filter is a function from an asset and a moment in time to a boolean. An example of a filter is a function indicating whether a security's price is below $10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.
A classifier is a function from an asset and a moment in time to a categorical output. More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.
In [ ]:
from quantopian.pipeline import Pipeline
In [ ]:
def make_pipeline():
return Pipeline()
In [ ]:
pipe = make_pipeline()
In [ ]:
from quantopian.research import run_pipeline
In [ ]:
result = run_pipeline(pipe, '2017-01-01', '2017-01-01')
In [ ]:
result.head(10)
In [ ]:
result.info()
In [ ]:
from quantopian.pipeline.data.builtin import USEquityPricing
In [ ]:
from quantopian.pipeline.factors import BollingerBands,SimpleMovingAverage,EWMA
In [ ]:
SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
In [ ]:
def make_pipeline():
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
return Pipeline(columns = {
'30 Day Mean Close':mean_close_30
})
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
In [ ]:
results.head(20)
In [ ]:
def make_pipeline():
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
return Pipeline(columns = {
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close
})
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
In [ ]:
results.head(10)
In [ ]:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close
})
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
In [ ]:
results.head()
In [ ]:
last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20
In [ ]:
close_price_filter
In [ ]:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check
})
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
In [ ]:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=perc_diff_check)
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
In [ ]:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen = ~perc_diff_check)
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
In [ ]:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
small_price = latest_close < 5
final_filter = perc_diff_check & small_price
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen = final_filter)
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:
In [ ]:
def make_pipeline():
# Create Filters for Masks First
latest_close = USEquityPricing.close.latest
small_price = latest_close < 5
# Pass in the mask
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10,
mask = small_price)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30,
mask = small_price)
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
final_filter = perc_diff_check
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen = final_filter)
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
In [ ]:
len(results)
In [ ]:
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.classifiers.morningstar import Sector
In [ ]:
morningstar_sector = Sector()
In [ ]:
exchange = morningstar.share_class_reference.exchange_id.latest
In [ ]:
exchange
In [ ]:
nyse_filter = exchange.eq('NYS')
In [ ]:
def make_pipeline():
# Create Filters for Masks First
latest_close = USEquityPricing.close.latest
small_price = latest_close < 5
# Classifier
nyse_filter = exchange.eq('NYS')
# Pass in the mask
mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 10,
mask = small_price)
mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
window_length = 30,
mask = small_price)
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
final_filter = perc_diff_check & nyse_filter
return Pipeline(columns = {
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=final_filter)
In [ ]:
results = run_pipeline(make_pipeline(),
'2017-01-01',
'2017-01-01')
results.head()
In [ ]:
len(results)
In [ ]:
from quantopian.pipeline import Pipeline
from quantopian.algorithm import attach_pipeline, pipeline_output
def initialize(context):
my_pipe = make_pipeline()
attach_pipeline(my_pipe, 'my_pipeline')
def make_pipeline():
return Pipeline()
def before_trading_start(context, data):
# Store our pipeline output DataFrame in context.
context.output = pipeline_output('my_pipeline')
In [ ]: