DataFrame partitionBy to a single Parquet file (per partition)
I had the exact same problem and I found a way to do this using DataFrame.repartition()
. The problem with using coalesce(1)
is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10)
you get more parallelism, but end up with 10 files per partition.
To get one file per partition without using coalesce()
, use repartition()
with the same columns you want the output to be partitioned by. So in your case, do this:
import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
Once I do that I get one parquet file per output partition, instead of multiple files.
I tested this in Python, but I assume in Scala it should be the same.
By definition :
coalesce(numPartitions: Int): DataFrame Returns a new DataFrame that has exactly numPartitions partitions.
You can use it to decrease the number of partitions in the RDD/DataFrame with the numPartitions parameter. It's useful for running operations more efficiently after filtering down a large dataset.
Concerning your code, it doesn't perform well because what you are actually doing is :
putting everything into 1 partition which overloads the driver since it's pull all the data into 1 partition on the driver (and also it not a good practice)
coalesce
actually shuffles all the data on the network which may also result in performance loss.
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
The shuffle concept is very important to manage and understand. It's always preferable to shuffle the minimum possible because it is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.
Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.
Concerning partitioning parquet, I suggest that you read the answer here about Spark DataFrames with Parquet Partitioning and also this section in the Spark Programming Guide for Performance Tuning.
I hope this helps !