The Flint Time Series Library

Flint is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.


In [ ]:
%classpath config resolver jitpack.io https://jitpack.io

In [ ]:
%%classpath add mvn
com.github.twosigma flint master-SNAPSHOT
org.apache.spark spark-sql_2.11 2.2.1
org.apache.spark spark-mllib_2.11 2.2.1
org.scalanlp breeze_2.10 0.13.2

In [ ]:
//Creates spark session

import com.twosigma.flint.timeseries.CSV
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .appName("Simple Application")
                        .master("local[4]")
                        .config("spark.ui.enabled", "false")
                        .getOrCreate()

In [ ]:
//Creates a TimeSeriesRDD from a CSV file

val tsRdd = CSV.from(
  spark.sqlContext,
  "../resources/data/flint-demo.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  sorted = true
)

In [ ]:
//Basic operations

import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.Row

def changeTimeFunction(id: Int, time: Long) : Long = {
    return if (id == 3) time + 25 else time
}

val priceAsInteger = tsRdd.cast("price" -> IntegerType)
val filteredRowsByPrice = tsRdd.keepRows { row: Row => row.getAs[Double]("price") > 4.0 }
val timeColumnOnly = tsRdd.keepColumns("time")
val withoutIdColumn = tsRdd.deleteColumns("id")
val renamedColumns = tsRdd.renameColumns("id" -> "ticker", "price" -> "highPrice")
val updatedTimeColumn = tsRdd.setTime {
  row: Row =>
    changeTimeFunction(row.getAs[Int]("id"), row.getAs[Long]("time"))
}

In [ ]:
//Crate columns
val newHighPriceColumn = tsRdd.addColumns(
  "highPrice" -> DoubleType -> {
    r: Row => r.getAs[Double]("price") + 1.5
  }
)

val results = tsRdd.addColumnsForCycle(
  "adjustedPrice" -> DoubleType -> { rows: Seq[Row] =>
    rows.map { row => (row, row.getAs[Double]("price") * rows.size) }.toMap
  }
)

In [ ]:
//Group functions
import com.twosigma.flint.timeseries.Windows
val groupedByCycle = tsRdd.groupByCycle()

val intervals = tsRdd
.keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 }
.keepRows { row: Row => row.getAs[Int]("id") == 3}
.keepColumns("time")

val groupedByInterval = tsRdd.groupByInterval(intervals)
val groupedByWindows = tsRdd.addWindows(Windows.pastAbsoluteTime("1000ns"))

In [ ]:
//Temporal join functions
val leftTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 }
.keepColumns("time", "price")
val rightTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 != 0 }
.keepColumns("time", "id")

val leftJoin = leftTSRdd.leftJoin(rightTSRdd, tolerance = "50ns")
val futureLeftJoin = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "50ns")

In [ ]:
//Summarize functions
import com.twosigma.flint.timeseries.Summarizers
val summarizedCycles = tsRdd.summarizeCycles(Summarizers.sum("price"))

In [ ]:
//stat.regression

import breeze.linalg.DenseVector
import org.apache.spark.mllib.random.RandomRDDs
import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint
import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression

// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)

// Fit the data using the OLS linear regression.
val model = OLSMultipleLinearRegression.regression(data)

// Retrieve the estimate beta and intercept.
val denseVector = model.estimateRegressionParameters

In [ ]:
spark.close()