Pyspark : forward fill with last observation for a DataFrame

Using Spark 1.5.1,

I've been trying to forward fill null values with the last known observation for one column of my DataFrame.

It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.

In this post, a solution in Scala was provided for a very similar problem by zero323.

But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?

Thanks for your help.

Below, a simple example sample input:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

And the expected output:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4

Solution 1:

Another workaround to get this working, is to try something like this:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = (
    Window
    .partitionBy('cookie_id')
    .orderBy('Time')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

final = (
    joined
    .withColumn('UserIDFilled', F.last('User_ID', ignorenulls=True).over(window))
)

So what this is doing is that it constructs your window based on the partition key and the order column. It also tells the window to look back all rows within the window up to the current row. Finally, at each row, you return the last value that is not null (which remember, according to your window, it includes your current row)

Solution 2:

The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.

Load the data

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

The DataFrame is

+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

Column used to sort the partitions

# get the sort key
def getKey(item):
    return item.c_date

The fill function. Can be used to fill in multiple columns if necessary.

# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

Convert to rdd, partition, sort and fill the missing values

# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

Convert back to DataFrame

df_out = sqlContext.createDataFrame(rdd)
df_out.show()

The output is

+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+

Solution 3:

Hope you find this forward fill function useful. It is written using native pyspark function. Neither udf nor rdd being used (both of them are very slow, especially UDF!).

Let's use example provided by @Sid.

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
] 

df = spark.createDataFrame(values, ['cookie_ID', 'Time', 'User_ID'])

enter image description here

Functions:

def cum_sum(df, sum_col , order_col, cum_sum_col_nm='cum_sum'):  
    '''Find cumulative sum of a column. 
    Parameters 
    -----------
    sum_col : String 
        Column to perform cumulative sum. 
    order_col : List 
        Column/columns to sort for cumulative sum. 
    cum_sum_col_nm : String
        The name of the resulting cum_sum column. 

    Return
    -------
    df : DataFrame
        Dataframe with additional "cum_sum_col_nm". 

    '''
    df = df.withColumn('tmp', lit('tmp')) 

    windowval = (Window.partitionBy('tmp') 
                 .orderBy(order_col)
                 .rangeBetween(Window.unboundedPreceding, 0)) 

    df = df.withColumn('cum_sum', sum(sum_col).over(windowval).alias('cumsum').cast(StringType()))
    df = df.drop('tmp') 
    return df   


def forward_fill(df, order_col, fill_col, fill_col_name=None):
    '''Forward fill a column by a column/set of columns (order_col).  
    Parameters:
    ------------
    df: Dataframe 
    order_col: String or List of string
    fill_col: String (Only work for a column for this version.) 

    Return:
    ---------
    df: Dataframe 
        Return df with the filled_cols. 
    '''

    # "value" and "constant" are tmp columns created ton enable forward fill. 
    df = df.withColumn('value', when(col(fill_col).isNull(), 0).otherwise(1))
    df = cum_sum(df, 'value', order_col).drop('value')  
    df = df.withColumn(fill_col, 
                when(col(fill_col).isNull(), 'constant').otherwise(col(fill_col))) 

    win = (Window.partitionBy('cum_sum') 
              .orderBy(order_col)) 

    if not fill_col_name:
        fill_col_name = 'ffill_{}'.format(fill_col)

    df = df.withColumn(fill_col_name, collect_list(fill_col).over(win)[0])
    df = df.drop('cum_sum')
    df = df.withColumn(fill_col_name, when(col(fill_col_name)=='constant', None).otherwise(col(fill_col_name)))
    df = df.withColumn(fill_col, when(col(fill_col)=='constant', None).otherwise(col(fill_col)))
    return df   

Let's see the results.

ffilled_df = forward_fill(df, 
                          order_col=['cookie_ID', 'Time'], 
                          fill_col='User_ID', 
                          fill_col_name = 'User_ID_ffil')
ffilled_df.sort(['cookie_ID', 'Time']).show()   

enter image description here

Solution 4:

// Forward filling
w1 = Window.partitionBy('cookie_id').orderBy('c_date').rowsBetween(Window.unboundedPreceding,0)
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

//Backward filling
final_df = df.withColumn('UserIDFilled', F.coalesce(F.last('user_id', True).over(w1),
                                                    F.first('user_id',True).over(w2)))

final_df.orderBy('cookie_id', 'c_date').show(truncate=False)

   +---------+----------+-------+------------+
|cookie_id|c_date    |user_id|UserIDFilled|
+---------+----------+-------+------------+
|1        |2015-12-01|null   |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-03|U2     |U2          |
|1        |2015-12-04|null   |U2          |
|1        |2015-12-05|null   |U2          |
|2        |2015-12-02|U3     |U3          |
|2        |2015-12-03|null   |U3          |
|2        |2015-12-04|null   |U3          |
|2        |2015-12-05|null   |U3          |
+---------+----------+-------+------------+