How does Spark partition(ing) work on files in HDFS?
When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat
used to read this file. For instance, if you use textFile()
it would be TextInputFormat
in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
When you call rdd.repartition(x)
it would perform a shuffle of the data from N
partititons you have in rdd
to x
partitions you want to have, partitioning would be done on round robin basis.
If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000)
your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)
When reading non-bucketed HDFS files (e.g. parquet) with spark-sql, the number of DataFrame partitions df.rdd.getNumPartitions
depends on these factors:
-
spark.default.parallelism
(roughly translates to #cores available for the application) -
spark.sql.files.maxPartitionBytes
(default 128MB) -
spark.sql.files.openCostInBytes
(default 4MB)
A rough estimation of the number of partitions is:
-
If you have enough cores to read all your data in parallel, (i.e. at least one core for every 128MB of your data)
AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
-
If you don't have enough cores,
AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
The exact calculations are slightly complicated and can be found on the code base for FileSourceScanExec, refer here.
Here is the snapshot of "How blocks in HDFS are loaded into Spark workers as partitions"
In this images 4 HDFS blocks are loaded as Spark partitions inside 3 workers memory
Example: I put a 30GB Textfile on the HDFS-System, which is distributing it on 10 nodes.
Will Spark
a) use the same 10 partitions?
Spark load same 10 HDFS bocks to workers memory as partitions. I assume block size of 30 GB file should be 3 GB to get 10 partitions/blocks (with default conf)
b) shuffle 30GB across the cluster when I call repartition(1000)?
Yes, Spark shuffle the data among the worker nodes in order to create 1000 partitions in workers memory.
Note:
HDFS Block -> Spark partition : One block can represent as One partition (by default)
Spark partition -> Workers : Many/One partitions can present in One workers