PySpark slice dataset adding a column until a condition
I have to add a column with a starting date based on a determinate condition:
STATUS.isin('CREATED', 'CREATED AGAIN')
For each ID, I wanna start from a date where status is CREATION
or CREATE AGAIN
and I wanna reproduce dataset adding a new column partitioning for TIME
window.
For example:
INPUT:
ID | $ | STATUS | TIME |
---|---|---|---|
1 | 050 | CREATED | 2021-11-01 |
1 | 120 | ATTEMPTING | 2021-11-01 |
1 | 130 | VALID | 2021-11-01 |
1 | 200 | OK | 2021-11-02 |
1 | 100 | CREATED AGAIN | 2021-11-03 |
1 | 160 | OK | 2021-11-03 |
1 | 200 | ONGOING | 2021-11-04 |
1 | 300 | OK | 2021-11-05 |
1 | 1000 | FINAL | 2021-11-06 |
2 | ... | ... | ... |
OUTPUT:
ID | $ | STATUS | TIME | START_DATE |
---|---|---|---|---|
1 | 050 | CREATED | 2021-11-01 | 2021-11-01 |
1 | 120 | ATTEMPTING | 2021-11-01 | 2021-11-01 |
1 | 130 | VALID | 2021-11-01 | 2021-11-01 |
1 | 200 | OK | 2021-11-02 | 2021-11-01 |
1 | 100 | CREATED AGAIN | 2021-11-03 | 2021-11-03 |
1 | 160 | OK | 2021-11-03 | 2021-11-03 |
1 | 200 | ONGOING | 2021-11-04 | 2021-11-03 |
1 | 300 | OK | 2021-11-05 | 2021-11-03 |
1 | 1000 | FINAL | 2021-11-06 | 2021-11-03 |
2 | ... | ... | ... | ... |
Solution 1:
You can add a column group
using a cumulative conditional sum then create column START_DATE
as first("TIME")
over Window partitioned by ID
+ group
:
from pyspark.sql import functions as F, Window
w1 = Window.partitionBy("ID").orderBy("TIME")
w2 = Window.partitionBy("ID", "group").orderBy("TIME")
df1 = df.withColumn(
"group",
F.sum(F.when(F.col("STATUS").isin(['CREATED', 'CREATED AGAIN']), 1)).over(w1)
).withColumn(
"START_DATE",
F.first("TIME").over(w2)
).drop("group")
df1.show()
#+---+----+-------------+----------+----------+
#| ID| $| STATUS| TIME|START_DATE|
#+---+----+-------------+----------+----------+
#| 1| 050| CREATED|2021-11-01|2021-11-01|
#| 1| 120| ATTEMPTING|2021-11-01|2021-11-01|
#| 1| 130| VALID|2021-11-01|2021-11-01|
#| 1| 200| OK|2021-11-02|2021-11-01|
#| 1| 100|CREATED AGAIN|2021-11-03|2021-11-03|
#| 1| 160| OK|2021-11-03|2021-11-03|
#| 1| 200| ONGOING|2021-11-04|2021-11-03|
#| 1| 300| OK|2021-11-05|2021-11-03|
#| 1|1000| FINAL|2021-11-06|2021-11-03|
#+---+----+-------------+----------+----------+