Flint is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.
Flint is an open source library for Spark based around the TimeSeriesRDD
, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDD
s.
Unlike DataFrame
and Dataset
, Flint's TimeSeriesRDD
s can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties.
It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.
In [ ]:
%classpath config resolver jitpack.io https://jitpack.io
In [ ]:
%%classpath add mvn
com.github.twosigma flint master-SNAPSHOT
org.apache.spark spark-sql_2.11 2.2.1
org.apache.spark spark-mllib_2.11 2.2.1
org.scalanlp breeze_2.10 0.13.2
In [ ]:
//Creates spark session
import com.twosigma.flint.timeseries.CSV
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Simple Application")
.master("local[4]")
.config("spark.ui.enabled", "false")
.getOrCreate()
In [ ]:
//Creates a TimeSeriesRDD from a CSV file
val tsRdd = CSV.from(
spark.sqlContext,
"../resources/data/flint-demo.csv",
header = true,
dateFormat = "yyyyMMdd HH:mm:ss.SSS",
sorted = true
)
In [ ]:
//Basic operations
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.Row
def changeTimeFunction(id: Int, time: Long) : Long = {
return if (id == 3) time + 25 else time
}
val priceAsInteger = tsRdd.cast("price" -> IntegerType)
val filteredRowsByPrice = tsRdd.keepRows { row: Row => row.getAs[Double]("price") > 4.0 }
val timeColumnOnly = tsRdd.keepColumns("time")
val withoutIdColumn = tsRdd.deleteColumns("id")
val renamedColumns = tsRdd.renameColumns("id" -> "ticker", "price" -> "highPrice")
val updatedTimeColumn = tsRdd.setTime {
row: Row =>
changeTimeFunction(row.getAs[Int]("id"), row.getAs[Long]("time"))
}
In [ ]:
//Crate columns
val newHighPriceColumn = tsRdd.addColumns(
"highPrice" -> DoubleType -> {
r: Row => r.getAs[Double]("price") + 1.5
}
)
val results = tsRdd.addColumnsForCycle(
"adjustedPrice" -> DoubleType -> { rows: Seq[Row] =>
rows.map { row => (row, row.getAs[Double]("price") * rows.size) }.toMap
}
)
In [ ]:
//Group functions
import com.twosigma.flint.timeseries.Windows
val groupedByCycle = tsRdd.groupByCycle()
val intervals = tsRdd
.keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 }
.keepRows { row: Row => row.getAs[Int]("id") == 3}
.keepColumns("time")
val groupedByInterval = tsRdd.groupByInterval(intervals)
val groupedByWindows = tsRdd.addWindows(Windows.pastAbsoluteTime("1000ns"))
In [ ]:
//Temporal join functions
val leftTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 }
.keepColumns("time", "price")
val rightTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 != 0 }
.keepColumns("time", "id")
val leftJoin = leftTSRdd.leftJoin(rightTSRdd, tolerance = "50ns")
val futureLeftJoin = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "50ns")
In [ ]:
//Summarize functions
import com.twosigma.flint.timeseries.Summarizers
val summarizedCycles = tsRdd.summarizeCycles(Summarizers.sum("price"))
In [ ]:
//stat.regression
import breeze.linalg.DenseVector
import org.apache.spark.mllib.random.RandomRDDs
import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint
import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression
// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)
// Fit the data using the OLS linear regression.
val model = OLSMultipleLinearRegression.regression(data)
// Retrieve the estimate beta and intercept.
val denseVector = model.estimateRegressionParameters
In [ ]:
spark.close()