Accessing nested data with key/value pairs in array
I have a dataframe with schema
root
|-- properties: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: string (nullable = true)
An example of 3 rows from the df appears as:
[]
[{"key":"id", "value":"as143"},
{"key":"user", "value":"John Doe"},
{"key":"email", "value":"[email protected]"},
{"key":"location", "value":"KY, USA"}]
[{"key":"id", "value":"bd143"},
{"key":"user", "value":"Adam Smith"},
{"key":"email", "value":"[email protected]"}]
So each new user is in a new row in the df. The amount of data in each row can be different with null values being allowed.
I would like to make a new df such that each key would be the new column name and the values in the column would be all the corresponding values of that key. For example, I would have a column titled 'user' and each row would have a different user name.
I have tried to access the sub-fields using this code but I get an error.
keys = table.select('properties.key').distinct().collect()[0][0]
table.withColumn('value', split(table.properties.value, ',')).
select(col('value'[0].alias(keys[0]),
select(col('value'[1].alias(keys[1]),
select(col('value'[2].alias(keys[2]),
select(col('value'[3].alias(keys[3])).display()
I have also tried to create a key, value map because I need to do these for other columns in the dataframe and for some of them, it is hard to tell have many key/value pairs there are since they are allowed to be null values so I may use pyspark.sql.functions.from_json
.
I feel this is the preferred way, however, I have not had success with that either. I have not been able to convert to a map and I think it is because rather than having "key":"value"
pairs I have "key": "key 1", "value": "value 1", "key": "key 2", "value": "value 2", "key": "key 3", "value": "value 3"
, etc all in the same row.
This is the code I used:
table.withColumn('properties', from_json(table.properties, MapType(StringType(), StringType())))
and I get the error:
cannot resolve 'entries' due to data type mismatch: argument 1 requires string type, however 'table.properties' is of array<struct<key:string, value:string>> type.
I am not exactly sure how to go about converting an array type like this to a map.
Solution 1:
Assuming this is your input dataframe:
data = '[{"properties":[]},{"properties":[{"key":"id","value":"as143"},{"key":"user","value":"John Doe"},{"key":"email","value":"[email protected]"},{"key":"location","value":"KY, USA"}]},{"properties":[{"key":"id","value":"bd143"},{"key":"user","value":"Adam Smith"},{"key":"email","value":"[email protected]"}]}]'
table = spark.read.json(spark.sparkContext.parallelize([data]))
Your column properties
is of type array of structs, that's why you get data mismatch error when you try to use from_json
function. You can simply explode expand the array to get 2 columns key
and value
like this:
table.selectExpr("inline(properties)").show()
#+--------+-----------------+
#| key| value|
#+--------+-----------------+
#| id| as143|
#| user| John Doe|
#| email| [email protected]|
#|location| KY, USA|
#| id| bd143|
#| user| Adam Smith|
#| email|[email protected]|
#+--------+-----------------+
That said, now what you want is to pivot
the column key
. In order to use pivot, we need some column ID
on which to groupby, if you don't have this column, you can assign a row_id
using monotonically_increasing_id
function before exploding the array:
from pyspark.sql import functions as F
result = table.withColumn("row_id", F.monotonically_increasing_id()) \
.selectExpr("row_id", "inline(properties)") \
.groupBy("row_id").pivot("key").agg(F.first("value")).drop("row_id")
result.show()
#+-----------------+-----+--------+----------+
#| email| id|location| user|
#+-----------------+-----+--------+----------+
#| [email protected]|as143| KY, USA| John Doe|
#|[email protected]|bd143| null|Adam Smith|
#+-----------------+-----+--------+----------+