A Time Series Library for Apache Spark

Overview

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

Dependency Version
Spark Version 2.3 and 2.4
Scala Version 2.12
Python Version 3.5 and above

How to install

Scala artifact is published in maven central:

https://mvnrepository.com/artifact/com.twosigma/flint

Python artifact is published in PyPi:

https://pypi.org/project/ts-flint

Note you will need both Scala and Python artifact to use Flint with PySpark.

How to build

To build from source:

Scala (in top-level dir):

sbt assemblyNoTest

Python (in python subdir):

python setup.py install

or

pip install .

Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started

Starting Point: TimeSeriesRDD and TimeSeriesDataFrame

The entry point into all functionalities for time series analysis in Flint is TimeSeriesRDD (for Scala) and TimeSeriesDataFrame (for Python). In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

import com.twosigma.flint.timeseries.CSV
val tsRdd = CSV.from(
  sqlContext,
  "file://foo/bar/data.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  codec = "gzip",
  sorted = true
)

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType.

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
val df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val rdd = ... // An RDD whose rows have sorted by their timestamps
val tsRdd = TimeSeriesRDD.fromRDD(
  rdd,
  schema = Schema("time" -> LongType, "price" -> DoubleType)
)(isSorted = true,
  timeUnit = MILLISECONDS
)

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val tsRdd = TimeSeriesRDD.fromParquet(
  sqlContext,
  path = "hdfs://foo/bar/"
)(isSorted = true,
  timeUnit = MILLISECONDS,
  columns = Seq("time", "id", "price"),  // By default, null for all columns
  begin = "20100101",                    // By default, null for no boundary at begin
  end = "20150101"                       // By default, null for no boundary at end
)

Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

  • groupByCycle A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1000L 2.0
// 2000L 3.0
// 2000L 4.0
// 2000L 5.0

val results = priceTSRdd.groupByCycle()
// time  rows
// ------------------------------------------------
// 1000L [[1000L, 1.0], [1000L, 2.0]]
// 2000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
  • groupByInterval A function to group rows whose timestamps fall into an interval. Intervals could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val clockTSRdd = ...
// A TimeSeriesRDD with only column "time"
// time
// -----
// 1000L
// 2000L
// 3000L

val results = priceTSRdd.groupByInterval(clockTSRdd)
// time  rows
// ----------------------------------
// 1000L [[1000L, 1.0], [1500L, 2.0]]
// 2000L [[2000L, 3.0], [2500L, 4.0]]
  • addWindows For each row, this function adds a new column whose value for a row is a list of rows within its window.
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

  • leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.
val leftTSRdd = ...
val rightTSRdd = ...
val result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
  • futureLeftJoin A function performs the temporal future left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.
val result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")

Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

  • summarizeCycles A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.
val volTSRdd = ...
// A TimeSeriesRDD with columns "time", "id", and "volume"
// time  id volume
// ------------
// 1000L 1  100
// 1000L 2  200
// 2000L 1  300
// 2000L 2  400

val result = volTSRdd.summarizeCycles(Summary.sum("volume"))
// time  volume_sum
// ----------------
// 1000L 300
// 2000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

  • summarizeIntervals
  • summarizeWindows
  • addSummaryColumns

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to [email protected].

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

© Two Sigma Open Source, LLC

Owner
Two Sigma
Two Sigma is a financial sciences company. Our scientists use rigorous inquiry, data analysis, and invention to solve tough challenges across financial services
Two Sigma
Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray

A unified Data Analytics and AI platform for distributed TensorFlow, Keras and PyTorch on Apache Spark/Flink & Ray What is Analytics Zoo? Analytics Zo

2.5k Dec 28, 2022
A Python implementation of the Robotics Toolbox for MATLAB

Robotics Toolbox for Python A Python implementation of the Robotics Toolbox for MATLAB® GitHub repository Documentation Wiki (examples and details) Sy

Peter Corke 1.2k Jan 07, 2023
A chain of stores, 10 different stores and 50 different requests a 3-month demand forecast for its product.

Demand-Forecasting Business Problem A chain of stores, 10 different stores and 50 different requests a 3-month demand forecast for its product.

Ayşe Nur Türkaslan 3 Mar 06, 2022
Software Engineer Salary Prediction

Based on 2021 stack overflow data, this machine learning web application helps one predict the salary based on years of experience, level of education and the country they work in.

Jhanvi Mimani 1 Jan 08, 2022
Temporal Alignment Prediction for Supervised Representation Learning and Few-Shot Sequence Classification

Temporal Alignment Prediction for Supervised Representation Learning and Few-Shot Sequence Classification Introduction. This package includes the pyth

5 Dec 06, 2022
A scikit-learn based module for multi-label et. al. classification

scikit-multilearn scikit-multilearn is a Python module capable of performing multi-label learning tasks. It is built on-top of various scientific Pyth

802 Jan 01, 2023
LibTraffic is a unified, flexible and comprehensive traffic prediction library based on PyTorch

LibTraffic is a unified, flexible and comprehensive traffic prediction library, which provides researchers with a credibly experimental tool and a convenient development framework. Our library is imp

432 Jan 05, 2023
PennyLane is a cross-platform Python library for differentiable programming of quantum computers

PennyLane is a cross-platform Python library for differentiable programming of quantum computers. Train a quantum computer the same way as a neural ne

PennyLaneAI 1.6k Jan 01, 2023
A Lightweight Hyperparameter Optimization Tool 🚀

The mle-hyperopt package provides a simple and intuitive API for hyperparameter optimization of your Machine Learning Experiment (MLE) pipeline.

Robert Lange 137 Dec 02, 2022
ClearML - Auto-Magical Suite of tools to streamline your ML workflow. Experiment Manager, MLOps and Data-Management

ClearML - Auto-Magical Suite of tools to streamline your ML workflow Experiment Manager, MLOps and Data-Management ClearML Formerly known as Allegro T

ClearML 4k Jan 09, 2023
CrayLabs and user contibuted examples of using SmartSim for various simulation and machine learning applications.

SmartSim Example Zoo This repository contains CrayLabs and user contibuted examples of using SmartSim for various simulation and machine learning appl

Cray Labs 14 Mar 30, 2022
Dieses Projekt ermöglicht es den Smartmeter der EVN (Netz Niederösterreich) über die Kundenschnittstelle auszulesen.

SmartMeterEVN Dieses Projekt ermöglicht es den Smartmeter der EVN (Netz Niederösterreich) über die Kundenschnittstelle auszulesen. Smart Meter werden

greenMike 43 Dec 04, 2022
Predict the income for each percentile of the population (Python) - FRENCH

05.income-prediction Predict the income for each percentile of the population (Python) - FRENCH Effectuez une prédiction de revenus Prérequis Pour ce

1 Feb 13, 2022
A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.

Light Gradient Boosting Machine LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed a

Microsoft 14.5k Jan 07, 2023
Real-time stream processing for python

Streamz Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelin

Python Streamz 1.1k Dec 28, 2022
This is a Machine Learning model which predicts the presence of Diabetes in Patients

Diabetes Disease Prediction This is a machine Learning mode which tries to determine if a person has a diabetes or not. Data The dataset is in comma s

Edem Gold 4 Mar 16, 2022
A Python package to preprocess time series

Disclaimer: This package is WIP. Do not take any APIs for granted. tspreprocess Time series can contain noise, may be sampled under a non fitting rate

Maximilian Christ 57 Dec 17, 2022
Using Logistic Regression and classifiers of the dataset to produce an accurate recall, f-1 and precision score

Using Logistic Regression and classifiers of the dataset to produce an accurate recall, f-1 and precision score

Thines Kumar 1 Jan 31, 2022
Simple, light-weight config handling through python data classes with to/from JSON serialization/deserialization.

Simple but maybe too simple config management through python data classes. We use it for machine learning.

Eren Gölge 67 Nov 29, 2022
pymc-learn: Practical Probabilistic Machine Learning in Python

pymc-learn: Practical Probabilistic Machine Learning in Python Contents: Github repo What is pymc-learn? Quick Install Quick Start Index What is pymc-

pymc-learn 196 Dec 07, 2022