Create column from array of struct Pyspark
I'm pretty new to data processing. I have a deeply nested dataset that have this approximately this schema :
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- elem: array
| | |-- registrationNumber: struct
| | | |-- registrationNumber : string
| | | |-- registrationNumberType : string
| | | |-- registrationCode : int
For the array, I will receive something like this. Keep in mind that the length is variable, I might receive no value or 10 or even more
[
{
registrationNumber : 123456789
registrationNumberType : VAT
registrationCode : 1234
},
{
registrationNumber : ABCDERTYU
registrationNumberType : fiscal1
registrationCode : 9876
},
{
registrationNumber : 123456789
registrationNumberType : foo
registrationCode : 8765
}
]
Is there a way to transform the schema to :
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- VAT: string
| |-- fiscal1: string
with VAT
and fiscal1
value being the registrationNumber
value.
I basically need to get a column with the VAT
and the fiscal1
value as column
Thanks so much
Edit:
Here is a sample json of col3
{
"col3": {
"somestring": "xxxxxx",
"registrationNumbers": [
{
'registrationNumber' : 'something',
'registrationNumberType' : 'VAT'
},
{
'registrationNumber' : 'somethingelse',
'registrationNumberType' : 'fiscal1'
},
{
'registrationNumber' : 'something i dont need',
'registrationNumberType' : 'fiscal2'
}
]
}
}
and here is what I would like to have :
{
"col3": {
"somestring": "xxxxxx",
"VAT" : "something"
"fiscal1" : "somethingelse"
}
}
Maybe I can, create a dataframe using the array and the primary key, create VAT
and fiscal1
columns and select data from the new dataframe to input in the column?
Finally to join the 2 dataframes using the primary key
You can use inline
function to explode and expand the struct elements of col3.registrationNumbers
array, then filter only rows with registrationNumberType
either VAT
or fiscal1
and pivot. After pivot, update the struct column col3
with the pivoted columns:
import pyspark.sql.functions as F
exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))
df1 = df.selectExpr("*", "inline(col3.registrationNumbers)") \
.filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"])) \
.groupBy("col1", "col2", "col3") \
.pivot("registrationNumberType") \
.agg(F.first("registrationNumber")) \
.withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1"))) \
.drop("VAT", "fiscal1")
df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# | |-- somestring: string (nullable = true)
# | |-- VAT: string (nullable = true)
# | |-- fiscal1: string (nullable = true)
df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1 |col2 |col3 |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+