In [1]:
from quantopian.pipeline import Pipeline
from quantopian.research import run_pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage, AverageDollarVolume
When we first looked at factors, we explored the set of built-in factors. Frequently, a desired computation isn't included as a built-in factor. One of the most powerful features of the Pipeline API is that it allows us to define our own custom factors. When a desired computation doesn't exist as a built-in, we define a custom factor.
Conceptually, a custom factor is identical to a built-in factor. It accepts inputs
, window_length
, and mask
as constructor arguments, and returns a Factor
object each day.
Let's take an example of a computation that doesn't exist as a built-in: standard deviation. To create a factor that computes the standard deviation over a trailing window, we can subclass quantopian.pipeline.CustomFactor
and implement a compute method whose signature is:
def compute(self, today, asset_ids, out, *inputs):
...
*inputs
are M x N numpy arrays, where M is the window_length
and N is the number of securities (usually around ~8000 unless a mask
is provided). *inputs
are trailing data windows. Note that there will be one M x N array for each BoundColumn
provided in the factor's inputs
list. The data type of each array will be the dtype
of the corresponding BoundColumn
.out
is an empty array of length N. out
will be the output of our custom factor each day. The job of compute is to write output values into out.asset_ids
will be an integer array of length N containing security ids corresponding to the columns in our *inputs
arrays.today
will be a pandas Timestamp representing the day for which compute
is being called.Of these, *inputs
and out
are most commonly used.
An instance of CustomFactor
that’s been added to a pipeline will have its compute method called every day. For example, let's define a custom factor that computes the standard deviation of the close price over the last 5 days. To start, let's add CustomFactor
and numpy
to our import statements.
In [2]:
from quantopian.pipeline import CustomFactor
import numpy
Next, let's define our custom factor to calculate the standard deviation over a trailing window using numpy.nanstd:
In [3]:
class StdDev(CustomFactor):
def compute(self, today, asset_ids, out, values):
# Calculates the column-wise standard deviation, ignoring NaNs
out[:] = numpy.nanstd(values, axis=0)
Finally, let's instantiate our factor in make_pipeline()
:
In [4]:
def make_pipeline():
std_dev = StdDev(inputs=[USEquityPricing.close], window_length=5)
return Pipeline(
columns={
'std_dev': std_dev
}
)
When this pipeline is run, StdDev.compute()
will be called every day with data as follows:
values
: An M x N numpy array, where M is 5 (window_length
), and N is ~8000 (the number of securities in our database on the day in question).out
: An empty array of length N (~8000). In this example, the job of compute
is to populate out
with an array storing of 5-day close price standard deviations.
In [5]:
result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
result
Out[5]:
When writing a custom factor, we can set default inputs
and window_length
in our CustomFactor
subclass. For example, let's define the TenDayMeanDifference
custom factor to compute the mean difference between two data columns over a trailing window using numpy.nanmean. Let's set the default inputs
to [USEquityPricing.close, USEquityPricing.open]
and the default window_length
to 10:
In [6]:
class TenDayMeanDifference(CustomFactor):
# Default inputs.
inputs = [USEquityPricing.close, USEquityPricing.open]
window_length = 10
def compute(self, today, asset_ids, out, close, open):
# Calculates the column-wise mean difference, ignoring NaNs
out[:] = numpy.nanmean(close - open, axis=0)
Remember in this case that `close` and `open` are each 10 x ~8000 2D [numpy arrays.](http://docs.scipy.org/doc/numpy-1.10.1/reference/arrays.ndarray.html)
If we call TenDayMeanDifference
without providing any arguments, it will use the defaults.
In [7]:
# Computes the 10-day mean difference between the daily open and close prices.
close_open_diff = TenDayMeanDifference()
The defaults can be manually overridden by specifying arguments in the constructor call.
In [8]:
# Computes the 10-day mean difference between the daily high and low prices.
high_low_diff = TenDayMeanDifference(inputs=[USEquityPricing.high, USEquityPricing.low])
Let's take another example where we build a momentum custom factor and use it to create a filter. We will then use that filter as a screen
for our pipeline.
Let's start by defining a Momentum
factor to be the division of the most recent close price by the close price from n
days ago where n
is the window_length
.
In [9]:
class Momentum(CustomFactor):
# Default inputs
inputs = [USEquityPricing.close]
# Compute momentum
def compute(self, today, assets, out, close):
out[:] = close[-1] / close[0]
Now, let's instantiate our Momentum
factor (twice) to create a 10-day momentum factor and a 20-day momentum factor. Let's also create a positive_momentum
filter returning True
for securities with both a positive 10-day momentum and a positive 20-day momentum.
In [10]:
ten_day_momentum = Momentum(window_length=10)
twenty_day_momentum = Momentum(window_length=20)
positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))
Next, let's add our momentum factors and our positive_momentum
filter to make_pipeline
. Let's also pass positive_momentum
as a screen
to our pipeline.
In [11]:
def make_pipeline():
ten_day_momentum = Momentum(window_length=10)
twenty_day_momentum = Momentum(window_length=20)
positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))
std_dev = StdDev(inputs=[USEquityPricing.close], window_length=5)
return Pipeline(
columns={
'std_dev': std_dev,
'ten_day_momentum': ten_day_momentum,
'twenty_day_momentum': twenty_day_momentum
},
screen=positive_momentum
)
Running this pipeline outputs the standard deviation and each of our momentum computations for securities with positive 10-day and 20-day momentum.
In [12]:
result = run_pipeline(make_pipeline(), '2015-05-05', '2015-05-05')
result
Out[12]:
Custom factors allow us to define custom computations in a pipeline. They are frequently the best way to perform computations on partner datasets or on multiple data columns. The full documentation for CustomFactors is available here.
In the next lesson, we'll use everything we've learned so far to create a pipeline for an algorithm.