This notebook is from the official Quantopian Guide on Pipelines. Make sure to visit their documentation for many more great resources!
Many trading algorithms have the following structure:
There are several technical challenges with doing this robustly. These include:
Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.
A factor is a function from an asset and a moment in time to a numerical value.
A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:
A filter is a function from an asset and a moment in time to a boolean. An example of a filter is a function indicating whether a security's price is below $10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.
A classifier is a function from an asset and a moment in time to a categorical output. More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.
In [ ]:
    
from quantopian.pipeline import Pipeline
    
In [ ]:
    
def make_pipeline():
    return Pipeline()
    
In [ ]:
    
pipe = make_pipeline()
    
In [ ]:
    
from quantopian.research import run_pipeline
    
In [ ]:
    
result = run_pipeline(pipe, '2017-01-01', '2017-01-01')
    
In [ ]:
    
result.head(10)
    
In [ ]:
    
result.info()
    
In [ ]:
    
from quantopian.pipeline.data.builtin import USEquityPricing
    
In [ ]:
    
from quantopian.pipeline.factors import BollingerBands,SimpleMovingAverage,EWMA
    
In [ ]:
    
SimpleMovingAverage(inputs = [USEquityPricing.close],
                    window_length = 30)
    
In [ ]:
    
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    
    return Pipeline(columns = {
        '30 Day Mean Close':mean_close_30
    })
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
    
In [ ]:
    
results.head(20)
    
In [ ]:
    
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    return Pipeline(columns = {
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
    
In [ ]:
    
results.head(10)
    
In [ ]:
    
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    return Pipeline(columns = {
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
    
In [ ]:
    
results.head()
    
In [ ]:
    
last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20
    
In [ ]:
    
close_price_filter
    
In [ ]:
    
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns = {
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close,
        'Positive Percent Diff': perc_diff_check
    })
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
In [ ]:
    
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns = {
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=perc_diff_check)
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
In [ ]:
    
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns = {
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen = ~perc_diff_check)
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
In [ ]:
    
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    small_price = latest_close < 5
    
    final_filter = perc_diff_check & small_price
    
    return Pipeline(columns = {
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen = final_filter)
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:
In [ ]:
    
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10,
                                        mask = small_price)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30,
                                        mask = small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check
    
    return Pipeline(columns = {
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen = final_filter)
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
In [ ]:
    
len(results)
    
In [ ]:
    
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.classifiers.morningstar import Sector
    
In [ ]:
    
morningstar_sector = Sector()
    
In [ ]:
    
exchange = morningstar.share_class_reference.exchange_id.latest
    
In [ ]:
    
exchange
    
In [ ]:
    
nyse_filter = exchange.eq('NYS')
    
In [ ]:
    
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Classifier
    nyse_filter = exchange.eq('NYS')
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 10,
                                        mask = small_price)
    mean_close_30 = SimpleMovingAverage(inputs = [USEquityPricing.close],
                                        window_length = 30,
                                        mask = small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check & nyse_filter
    
    return Pipeline(columns = {
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)
    
In [ ]:
    
results = run_pipeline(make_pipeline(),
                       '2017-01-01',
                       '2017-01-01')
results.head()
    
In [ ]:
    
len(results)
    
In [ ]:
    
from quantopian.pipeline import Pipeline
from quantopian.algorithm import attach_pipeline, pipeline_output
def initialize(context):
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')
def make_pipeline():
    return Pipeline()
def before_trading_start(context, data):
    # Store our pipeline output DataFrame in context.
    context.output = pipeline_output('my_pipeline')
    
In [ ]: