PySpark Windows function (lead,lag) in Synapse Workspace

Scenario:

  • The ticket has StartDate and EndDate , If StartDate and EndDate exist, then make a new dataframe as show in desired output below.

Pyspark Dataset look like shown below

#base Schema for Testing purpose
#Dataset

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
                       ,StructField('StartTime', StringType(), True)\
                       ,StructField('EndTime', StringType(), True)])

data = [
        {"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
        {"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
        {"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
        ]

from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master('local[1]') \
    .appName('SparkByExamples.com') \
    .getOrCreate()

# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)

df1.show()

Dataset created:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|          NaN|
|   Ticket1|          NaN|1/23/19 11:00|
|   Ticket1| 1/25/19 7:00|          NaN|
|   Ticket1| 1/27/19 3:00|          NaN|
|   Ticket2|1/29/19 10:00|          NaN|
|   Ticket2|          NaN| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00|          NaN|
|   Ticket2|          NaN| 3/27/19 8:00|
|   Ticket2|          NaN|3/27/19 10:00|
|   Ticket3| 4/25/19 1:00|          NaN|
+----------+-------------+-------------+

The desired output should be:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|1/23/19 11:00|
|   Ticket2|1/29/19 10:00| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+

Applying Lead function to see, if endtime exist for the ticket

from pyspark.sql.window import Window
import pyspark.sql.functions as psf

windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()

pysparkdf = df.toPandas()

import pandas as pd 
tickets = pysparkdf.groupby('CaseNumber')

def isLeadnull(e): 
    return e['lead'] != None

my_list = []
for i,ticket in tickets:
    for j,e in ticket.iterrows() :
        if  isLeadnull(e):
            my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
        else:
            print(e['lead'],'Do nothing as condition not met')

The output after this function is:

[{'CaseNumber': 'Ticket1',
  'Start': '1/22/19 10:00',
  'EndTime': '1/23/19 11:00'},
 {'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2',
  'Start': '1/29/19 10:00',
  'EndTime': '2/23/19 2:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]

Solution 1:

This is a sort of Gaps and Islands problem. You can identify the "island" using conditional cumulative sum by creating a group column, then you can group by CaseNumber + group and aggregate max StartTime and min EndTime for each group:

from pyspark.sql import functions as F, Window

# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
    .withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
    .replace("", None)

w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))

df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
    .groupBy("CaseNumber", "group") \
    .agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
    .filter(F.col("EndTime").isNotNull()) \
    .drop("group")

df2.show()
#+----------+-------------------+-------------------+
#|CaseNumber|          StartTime|            EndTime|
#+----------+-------------------+-------------------+
#|   Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#|   Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#|   Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
#+----------+-------------------+-------------------+

To understand the logic, you can show intermediary columns before the group by step:

df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()

#+----------+-------------------+-------------------+-----+
#|CaseNumber|          StartTime|            EndTime|group|
#+----------+-------------------+-------------------+-----+
#|   Ticket1|2019-01-22 10:00:00|               null|    1|
#|   Ticket1|               null|2019-01-23 11:00:00|    1|
#|   Ticket1|2019-01-25 07:00:00|               null|    2|
#|   Ticket1|2019-01-27 03:00:00|               null|    3|
#|   Ticket2|2019-01-29 10:00:00|               null|    1|
#|   Ticket2|               null|2019-02-23 02:00:00|    1|
#|   Ticket2|2019-03-25 07:00:00|               null|    2|
#|   Ticket2|               null|2019-03-27 08:00:00|    2|
#|   Ticket2|               null|2019-03-27 10:00:00|    2|
#|   Ticket3|2019-04-25 01:00:00|               null|    1|
#+----------+-------------------+-------------------+-----+