Updating json column using window cumulative via pyspark

Here's one way using higher-order functions on array and map columns:

  1. get previous Properties for each row using lag and convert both the previous and current row Properties into map type
  2. using collect_list function over window, get cumulative array of previous row Properties
  3. add the current row Properties to the resulting array and use aggregate to concatenate inner maps using map_concat. From your example, it seems that the update operation consists on simply adding the new keys, so before concat, we filter the already existing keys using map_filter function
  4. use to_json to get json string from the aggregated map and drop intermediary columns
from pyspark.sql import functions as F, Window

w = Window.partitionBy("ID").orderBy("Timestamp")

df1 = df.withColumn("rownum", F.row_number().over(w)) \
    .withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
    .withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
    .withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
    .withColumn(
        "New_Props",
        F.to_json(F.aggregate(
            F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
            F.expr("cast(map() as map<string,string>)"),
            lambda acc, x: F.map_concat(
                acc,
                F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
            )
        ))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")


df1.show(truncate=False)
#+---+---------+------------------------------------------+------+---------------------------------------+
#|ID |Timestamp|Properties                                |rownum|New_Props                              |
#+---+---------+------------------------------------------+------+---------------------------------------+
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |1     |{"a1":"3","a2":"12","a4":"r"}          |
#|a  |7        |{"a1": 5, "a2": 8}                        |2     |{"a1":"5","a2":"8","a4":"r"}           |
#|a  |8        |{"a2": 4}                                 |3     |{"a2":"4","a1":"5","a4":"r"}           |
#|a  |10       |{"a3": "z", "a4": "t"}                    |4     |{"a3":"z","a4":"t","a2":"4","a1":"5"}  |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1     |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b  |14       |{"b8": "y", "b3": 2}                      |2     |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b  |20       |{"b2": "k", "b3": 9}                      |3     |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
#+---+---------+------------------------------------------+------+---------------------------------------+

If you prefer using SQL query, here's the equivalent SparkSQL:

WITH props AS (
    SELECT  *,
            row_number() over(partition by ID order by Timestamp) AS rownum,
            from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
            from_json(Properties, 'map<string,string>') AS current_prop_map
    FROM    props_tb
),  cumulative_props AS (
    SELECT  *,
            collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
    FROM    props 
)

SELECT  ID,
        Timestamp,
        Properties,
        aggregate(
            concat(array(current_prop_map), reverse(cumulative_prev_props)),
            cast(map() as map<string,string>),
            (acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
        ) AS New_Props,
        rownum
FROM    cumulative_props