PySpark Windows function (lead,lag) in Synapse Workspace
Scenario:
- The ticket has
StartDate
andEndDate
, IfStartDate
andEndDate
exist, then make a new dataframe as show in desired output below.
Pyspark Dataset look like shown below
#base Schema for Testing purpose
#Dataset
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
,StructField('StartTime', StringType(), True)\
,StructField('EndTime', StringType(), True)])
data = [
{"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
{"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
{"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
{"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
]
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()
# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)
df1.show()
Dataset created:
+----------+-------------+-------------+
|CaseNumber| StartTime| EndTime|
+----------+-------------+-------------+
| Ticket1|1/22/19 10:00| NaN|
| Ticket1| NaN|1/23/19 11:00|
| Ticket1| 1/25/19 7:00| NaN|
| Ticket1| 1/27/19 3:00| NaN|
| Ticket2|1/29/19 10:00| NaN|
| Ticket2| NaN| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| NaN|
| Ticket2| NaN| 3/27/19 8:00|
| Ticket2| NaN|3/27/19 10:00|
| Ticket3| 4/25/19 1:00| NaN|
+----------+-------------+-------------+
The desired output should be:
+----------+-------------+-------------+
|CaseNumber| StartTime| EndTime|
+----------+-------------+-------------+
| Ticket1|1/22/19 10:00|1/23/19 11:00|
| Ticket2|1/29/19 10:00| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+
Applying Lead function to see, if endtime
exist for the ticket
from pyspark.sql.window import Window
import pyspark.sql.functions as psf
windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()
pysparkdf = df.toPandas()
import pandas as pd
tickets = pysparkdf.groupby('CaseNumber')
def isLeadnull(e):
return e['lead'] != None
my_list = []
for i,ticket in tickets:
for j,e in ticket.iterrows() :
if isLeadnull(e):
my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
else:
print(e['lead'],'Do nothing as condition not met')
The output after this function is:
[{'CaseNumber': 'Ticket1',
'Start': '1/22/19 10:00',
'EndTime': '1/23/19 11:00'},
{'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2',
'Start': '1/29/19 10:00',
'EndTime': '2/23/19 2:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]
Solution 1:
This is a sort of Gaps and Islands problem. You can identify the "island" using conditional cumulative sum by creating a group
column, then you can group by CaseNumber
+ group
and aggregate max StartTime
and min EndTime
for each group:
from pyspark.sql import functions as F, Window
# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
.withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
.replace("", None)
w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))
df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
.groupBy("CaseNumber", "group") \
.agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
.filter(F.col("EndTime").isNotNull()) \
.drop("group")
df2.show()
#+----------+-------------------+-------------------+
#|CaseNumber| StartTime| EndTime|
#+----------+-------------------+-------------------+
#| Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#| Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#| Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
#+----------+-------------------+-------------------+
To understand the logic, you can show intermediary columns before the group by step:
df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()
#+----------+-------------------+-------------------+-----+
#|CaseNumber| StartTime| EndTime|group|
#+----------+-------------------+-------------------+-----+
#| Ticket1|2019-01-22 10:00:00| null| 1|
#| Ticket1| null|2019-01-23 11:00:00| 1|
#| Ticket1|2019-01-25 07:00:00| null| 2|
#| Ticket1|2019-01-27 03:00:00| null| 3|
#| Ticket2|2019-01-29 10:00:00| null| 1|
#| Ticket2| null|2019-02-23 02:00:00| 1|
#| Ticket2|2019-03-25 07:00:00| null| 2|
#| Ticket2| null|2019-03-27 08:00:00| 2|
#| Ticket2| null|2019-03-27 10:00:00| 2|
#| Ticket3|2019-04-25 01:00:00| null| 1|
#+----------+-------------------+-------------------+-----+