Multiple parquet files have a different data type for 1-2 columns

Background

I was struggling with a very similar issue at work. I had an audit folder where each file had the same column names, but the Date column was a StringType rather than a TimestampType.

For background, I'm originally a C# developer and I have started using Azure Synapse Analytics. I am fairly new to using Pyspark so I spent a couple days on this problem, looking around but I was struggling to find anything.I eventually figured out a solution on my own, so I thought I'd come back to give guidance to you or to any future developers who are in a pickle.

Solution

Firstly, I defined the schema that matched the columns of my newer and correct files.

schema = StructType([,
    StructField("Date", TimestampType(), False),
    StructField("PipelineName", StringType(), False),
    StructField("SourceFileName", StringType(), False),
    StructField("ActivityName", StringType(), True),
    StructField("InsertedRows", LongType(), True),
    StructField("FailedRows", LongType(), True),
    StructField("Message", StringType(), True)])

I then created a blank dataframe with the schema I just created.

df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

Then I iterated through every file that was in my audit directory.

auditDir = "DirectoryNameHere"
for f in mssparkutils.fs.ls(auditDir)

Within my for loop, I used the directory name and the file name to create the file path.

I create a new dataframe that reads this file's path using a withColumn that looks at the Date column, and converts it to a timestamp. This seems to work whether the column is a StringType or a TimestampType.

I presume something similar can be done to convert your strings into doubles.

Because all my parquet files have identical columns, I have just used the union method rather than unionByName, and this joins this file's data onto an ever growing dataframe.

path = os.path.join(auditDir, f.name)
newDf = spark.read.parquet(path).withColumn("Date", to_timestamp(col("Date")))    
df = df.union(newDf)

For my purposes, I am then writing these out as one partition in order of the oldest date first. This new partition is then written to the same folder. However, I would recommend if you can to write to another folder and keep your old files as a backup.

df.orderBy(col("Date")).coalesce(1).write.mode("append").parquet(auditDir)

While iterating over files isn't ideal, this has been the only way I have found. Thankfully, my data size was small so it did not take too long.

Thanks for reading, I hope this helps you and others a lot

-Josh