Multiple parquet files have a different data type for 1-2 columns
Background
I was struggling with a very similar issue at work. I had an audit folder where each file had the same column names, but the Date column was a StringType rather than a TimestampType.
For background, I'm originally a C# developer and I have started using Azure Synapse Analytics. I am fairly new to using Pyspark so I spent a couple days on this problem, looking around but I was struggling to find anything.I eventually figured out a solution on my own, so I thought I'd come back to give guidance to you or to any future developers who are in a pickle.
Solution
Firstly, I defined the schema that matched the columns of my newer and correct files.
schema = StructType([,
StructField("Date", TimestampType(), False),
StructField("PipelineName", StringType(), False),
StructField("SourceFileName", StringType(), False),
StructField("ActivityName", StringType(), True),
StructField("InsertedRows", LongType(), True),
StructField("FailedRows", LongType(), True),
StructField("Message", StringType(), True)])
I then created a blank dataframe with the schema I just created.
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
Then I iterated through every file that was in my audit directory.
auditDir = "DirectoryNameHere"
for f in mssparkutils.fs.ls(auditDir)
Within my for loop, I used the directory name and the file name to create the file path.
I create a new dataframe that reads this file's path using a withColumn that looks at the Date column, and converts it to a timestamp. This seems to work whether the column is a StringType or a TimestampType.
I presume something similar can be done to convert your strings into doubles.
Because all my parquet files have identical columns, I have just used the union method rather than unionByName, and this joins this file's data onto an ever growing dataframe.
path = os.path.join(auditDir, f.name)
newDf = spark.read.parquet(path).withColumn("Date", to_timestamp(col("Date")))
df = df.union(newDf)
For my purposes, I am then writing these out as one partition in order of the oldest date first. This new partition is then written to the same folder. However, I would recommend if you can to write to another folder and keep your old files as a backup.
df.orderBy(col("Date")).coalesce(1).write.mode("append").parquet(auditDir)
While iterating over files isn't ideal, this has been the only way I have found. Thankfully, my data size was small so it did not take too long.
Thanks for reading, I hope this helps you and others a lot
-Josh