How to transform data with sliding window over time series data in Pyspark

I am trying to extract features based on sliding window over time series data. In Scala, it seems like there is a sliding function based on this post and the documentation

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?


Solution 1:

As far as I can tell sliding function is not available from Python and SlidingRDD is a private class and cannot be accessed outside MLlib.

If you to use sliding on an existing RDD you can create poor man's sliding like this:

def sliding(rdd, n):
    assert n > 0
    def gen_window(xi, n):
        x, i = xi
        return [(i - offset, (i, x)) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
        groupByKey(). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

Alternatively you can try something like this (with a small help of toolz)

from toolz.itertoolz import sliding_window, concat

def sliding2(rdd, n):
    assert n > 1

    def get_last_el(i, iter):
        """Return last n - 1 elements from the partition"""
        return  [(i, [x for x in iter][(-n + 1):])]

    def slide(i, iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n, concat([last_items.value[i - 1], iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k, v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)

Solution 2:

To add to venuktan's answer, here is how to create a time-based sliding window using Spark SQL and retain the full contents of the window, rather than taking an aggregate of it. This was needed in my use case of preprocessing time series data into sliding windows for input into Spark ML.

One limitation of this approach is that we assume you want to take sliding windows over time.

Firstly, you may create your Spark DataFrame, for example by reading in a CSV file:

df = spark.read.csv('foo.csv')

We assume that your CSV file has two columns: one of which is a unix timestamp and the other which is a column you want to extract sliding windows from.

from pyspark.sql import functions as f

window_duration = '1000 millisecond'
slide_duration = '500 millisecond'

df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
    .groupBy(f.window("_c0", window_duration, slide_duration)) \
    .agg(f.collect_list(f.array('_c1'))) \
    .withColumnRenamed('collect_list(array(_c1))', 'sliding_window')

Bonus: to convert this array column to the DenseVector format required for Spark ML, see the UDF approach here.

Extra Bonus: to un-nest the resulting column, such that each element of your sliding window has its own column, try this approach here.

I hope this helps, please let me know if I can clarify anything.