Flint is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.
Flint is an open source library for Spark based around the TimeSeriesRDD
, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDD
s.
Unlike DataFrame
and Dataset
, Flint's TimeSeriesRDD
s can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties.
It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.
This example uses prices.csv
file from Kaggle. For it to work you need to get it and put it in /tmp/prices.csv
.
In [ ]:
%%classpath add mvn
org.apache.spark spark-sql_2.12 2.4.4
org.apache.spark spark-mllib_2.12 2.4.4
com.github.twosigma flint 6055a7a231
In [ ]:
val begin = "20150101"
val end = "20160101"
In [ ]:
%%spark -s
In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//import com.twosigma.flint.timeseries.io.read
import com.twosigma.flint.timeseries.Windows
import com.twosigma.flint.timeseries.Summarizers
import scala.concurrent.duration._
//import com.twosigma.flint.timeseries.implicits._
import com.twosigma.flint.timeseries._
In [ ]:
import com.twosigma.flint.timeseries.CSV
//Load prices.csv from https://www.kaggle.com/dgawlik/nyse
//Creates a TimeSeriesRDD from a CSV file
var pricesRdd = CSV.from(
spark.sqlContext,
"file:///tmp/prices.csv",
header = true,
timeColumnName = "date",
dateFormat = "dd/MM/yyyy HH:mm",
sorted = false
)
pricesRdd
In [ ]:
pricesRdd.display(5)
In [ ]:
val priceAsInteger = pricesRdd.cast("close" -> IntegerType)
preview(priceAsInteger)
In [ ]:
val filteredRowsByPrice = pricesRdd.keepRows { row: Row => row.getAs[Double]("low") > 4.0 }
preview(filteredRowsByPrice)
In [ ]:
val timeColumnOnly = pricesRdd.keepColumns("time")
preview(timeColumnOnly)
In [ ]:
val withoutIdColumn = pricesRdd.deleteColumns("symbol")
preview(withoutIdColumn)
In [ ]:
val renamedColumns = pricesRdd.renameColumns("symbol" -> "ticker", "low" -> "lowPrice", "open" -> "openPrice", "close" -> "closePrice", "high" -> "highPrice")
preview(renamedColumns)
In [ ]:
// Calculate logarithm of a column
val logVolumeRdd = pricesRdd.addColumns("logVolume" -> DoubleType -> { row => scala.math.log(row.getAs[Double]("volume")) })
preview(pricesRdd)
In [ ]:
// Raise a column to an exponent
val squaredVolumeRdd = pricesRdd.addColumns("squaredVolume" -> DoubleType -> { row => scala.math.pow(row.getAs[Double]("volume"), 2) })
preview(squaredVolumeRdd)
In [ ]:
// Calculate difference between two columns
val priceChangeRdd = pricesRdd.addColumns("priceChange" -> DoubleType -> { row =>
row.getAs[Double]("close") - row.getAs[Double]("open")
})
preview(priceChangeRdd)
In [ ]:
val pricePercentChange = pricesRdd.addColumns("pricePercentChange" -> DoubleType -> { row =>
val openPrice = row.getAs[Double]("open")
val closePrice = row.getAs[Double]("close")
if (openPrice != 0) (closePrice - openPrice) / openPrice else null
})
preview(pricePercentChange)
In [ ]:
// Select rows where the price went up
val priceIncreasedRdd = pricesRdd.keepRows { row =>
row.getAs[Double]("close") > row.getAs[Double]("open")
}
preview(priceIncreasedRdd)
In [ ]:
// The keepRows and deleteRows functions take a function from Row to Boolean as a filtering criteria.
// Only get rows whose symbol starts with 'A'
val startsWithARdd = pricesRdd.keepRows { row =>
val symbol = row.getAs[String]("symbol")
symbol != null && symbol.startsWith("A")
}
preview(startsWithARdd)
In [ ]:
//Remove all rows whose volumn is less than 2000000
val lowVolumeRdd = pricesRdd.keepRows { row =>
row.getAs[Double]("volume") < 2000000
}
preview(lowVolumeRdd)
In [ ]:
// Moving average over the last two weeks
val ibmPricesRdd = pricesRdd.keepRows { row =>
row.getAs[String]("symbol") == "IBM"
}
var windowedIbmPricesRdd = ibmPricesRdd.addWindows(Windows.pastAbsoluteTime("14days"))
windowedIbmPricesRdd = windowedIbmPricesRdd.addColumns("movingAverage" -> DoubleType -> { row =>
val pastRows = row.getAs[Seq[Row]]("window_past_14days")
pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(windowedIbmPricesRdd)
In [ ]:
// Moving average over the last two weeks for all symbols
var pastWindowRdd = pricesRdd.addWindows(Windows.pastAbsoluteTime("14days"), Seq("symbol"))
pastWindowRdd = pastWindowRdd.addColumns("movingAverage" -> DoubleType -> { row =>
val pastRows = row.getAs[Seq[Row]]("window_past_14days")
pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(pastWindowRdd)
In [ ]:
// addColumnsForCycle takes a closure that is applied to a list of rows and returns a map from row to result. The list contains all rows that share a timestamp.
// Add a column containing the number of instruments in the universe on each day
val cycleRdd = pricesRdd.addColumnsForCycle("universeSize" -> IntegerType -> { rows: Seq[Row] =>
rows.map { row => row -> rows.size }.toMap
})
preview(cycleRdd)
In [ ]:
// Compute the Z score across an interval
val zScoreRdd = pricesRdd.addColumnsForCycle("volumeZScore" -> DoubleType -> { rows: Seq[Row] =>
val mean = rows.map(_.getAs[Double]("volume")).sum / rows.size
val stddev = scala.math.sqrt(rows.map { row =>
scala.math.pow(row.getAs[Double]("close") - mean, 2)
}.sum ) / (rows.size - 1)
rows.map { row =>
row -> (row.getAs[Double]("close") - mean) / stddev
}.toMap
})
preview(zScoreRdd)
In [ ]:
// Add a column with rankings with the same timestamp
import org.apache.commons.math3.stat.ranking.NaturalRanking
val rankedRdd = pricesRdd.addColumnsForCycle("r" -> DoubleType -> { rows: Seq[Row] =>
val ranking = new NaturalRanking()
val ranks = ranking.rank(rows.map(_.getAs[Double]("volume")).toArray)
(rows zip ranks).toMap
})
preview(rankedRdd)
In [ ]:
// Volume weighted average price for every 7 days for IBM
val clock = Clocks.uniform(sc, "7d")
var ibmPricesRdd = pricesRdd.keepRows { row =>
row.getAs[String]("symbol") == "IBM"
}
var volumeWeightedRdd = ibmPricesRdd.groupByInterval(clock).addColumns("volumeWeightedPrice" -> DoubleType -> { row =>
val rows = row.getAs[Seq[Row]]("rows")
val weightedSum = rows.map { row =>
(row.getAs[Double]("open") + row.getAs[Double]("close")) / 2 * row.getAs[Double]("volume")
}.sum
weightedSum / rows.map (_.getAs[Double]("volume")).sum
}).deleteColumns("rows")
preview(volumeWeightedRdd)
In [ ]:
// Average daily volume
val volumeRdd = pricesRdd.summarize(Summarizers.nthMoment("volume", 1), Seq("symbol"))
preview(volumeRdd)
In [ ]:
//stat.regression
import breeze.linalg.DenseVector
import org.apache.spark.mllib.random.RandomRDDs
import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint
import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression
// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)
// Fit the data using the OLS linear regression.
val model = OLSMultipleLinearRegression.regression(data)
// Retrieve the estimate beta and intercept.
val denseVector = model.estimateRegressionParameters
Map(denseVector.activeIterator.toSeq.map { m => m._1 -> m._2} : _*)