Updating json column using window cumulative via pyspark
Here's one way using higher-order functions on array and map columns:
- get previous
Properties
for each row usinglag
and convert both the previous and current rowProperties
into map type - using
collect_list
function over window, get cumulative array of previous rowProperties
- add the current row
Properties
to the resulting array and useaggregate
to concatenate inner maps usingmap_concat
. From your example, it seems that the update operation consists on simply adding the new keys, so before concat, we filter the already existing keys usingmap_filter
function - use
to_json
to get json string from the aggregated map and drop intermediary columns
from pyspark.sql import functions as F, Window
w = Window.partitionBy("ID").orderBy("Timestamp")
df1 = df.withColumn("rownum", F.row_number().over(w)) \
.withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
.withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
.withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
.withColumn(
"New_Props",
F.to_json(F.aggregate(
F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
F.expr("cast(map() as map<string,string>)"),
lambda acc, x: F.map_concat(
acc,
F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
)
))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")
df1.show(truncate=False)
#+---+---------+------------------------------------------+------+---------------------------------------+
#|ID |Timestamp|Properties |rownum|New_Props |
#+---+---------+------------------------------------------+------+---------------------------------------+
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |1 |{"a1":"3","a2":"12","a4":"r"} |
#|a |7 |{"a1": 5, "a2": 8} |2 |{"a1":"5","a2":"8","a4":"r"} |
#|a |8 |{"a2": 4} |3 |{"a2":"4","a1":"5","a4":"r"} |
#|a |10 |{"a3": "z", "a4": "t"} |4 |{"a3":"z","a4":"t","a2":"4","a1":"5"} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b |14 |{"b8": "y", "b3": 2} |2 |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b |20 |{"b2": "k", "b3": 9} |3 |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
#+---+---------+------------------------------------------+------+---------------------------------------+
If you prefer using SQL query, here's the equivalent SparkSQL:
WITH props AS (
SELECT *,
row_number() over(partition by ID order by Timestamp) AS rownum,
from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
from_json(Properties, 'map<string,string>') AS current_prop_map
FROM props_tb
), cumulative_props AS (
SELECT *,
collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
FROM props
)
SELECT ID,
Timestamp,
Properties,
aggregate(
concat(array(current_prop_map), reverse(cumulative_prev_props)),
cast(map() as map<string,string>),
(acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
) AS New_Props,
rownum
FROM cumulative_props