How do you control the size of the output file?

Solution 1:

It's impossible for Spark to control the size of Parquet files, because the DataFrame in memory needs to be encoded and compressed before writing to disks. Before this process finishes, there is no way to estimate the actual file size on disk.

So my solution is:

  • Write the DataFrame to HDFS, df.write.parquet(path)
  • Get the directory size and calculate the number of files

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val dirSize = fs.getContentSummary(path).getLength
    val fileNum = dirSize/(512 * 1024 * 1024)  // let's say 512 MB per file
    
  • Read the directory and re-write to HDFS

    val df = sqlContext.read.parquet(path)
    df.coalesce(fileNum).write.parquet(another_path)
    

    Do NOT reuse the original df, otherwise it will trigger your job two times.

  • Delete the old directory and rename the new directory back

    fs.delete(new Path(path), true)
    fs.rename(new Path(newPath), new Path(path))
    

This solution has a drawback that it needs to write the data two times, which doubles disk IO, but for now this is the only solution.

Solution 2:

There isn't a roll-after-specific-size option in Spark yet, but the seconds best: roll after specific number of records.

Since Spark 2.2 it's possible to set maxRecordsPerFile.

See also https://stackoverflow.com/a/48143315/630269

Solution 3:

As others have mentioned you can't explicitly hit a target size per file. You can, however, get all your output files to have about the same number of rows. If you know on average what your compression ratio looks like, evenly distributing rows across output files up to a max_rows will get you consistent sizes of about your target.

This is easier said than done if you're doing a partitionBy before you write. Here's some pseudocode for how we do it:

-- #3 distribute partitionC's rows based on partitions plus random integer that pertains to file number
select * from dataframe_table as t4
inner join

    -- #2 calculate the number of output files per partition
    ((select t1.partitionA, t1.partitionB, cast(t2.partition_num_rows / max_rows as int) + 1 as partition_num_files from dataframe_table) as t1
        inner join 

        -- #1 determine number of rows in output partition
        (select partitionA, partitionB, count(*) as partition_num_rows from dataframe_table group by (partitionA, partitionB)) as t2
        on t1.partitionA = t2.partitionA and t1.partitionB = t2.partitionB) as t3

on t3.partitionA = t4.partitionA and t3.partitionB=t4.partitionB
distribute by (t4.partitionA, t4.partitionC, floor(rand() * t3.partition_num_files)) sort by (partitionC, sortfield)

I included a sort on the partition here because in our use-case this drastically improves compression while only minimally impacting performance.

And if your results from step 1 and 2 are sufficiently small Spark may be able to broadcast join them to speed them up.

Solution 4:

Ok here's my perfected method when taking into account target file size, memory usage and execution time.. These files also includes snappy compression and dictionary encoding.

My HDFS Blocksize is 128 megs (128 * 1024 * 1024):

<property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
</property>

Here are my final parquet files which are all super close the the hdfs block size.

133916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
133459404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
133668445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
134004329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
134015650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
132053162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
132917851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
122594040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet

This is how I did this..

A. Come up with a rough number of rows to generate a bunch of SMALL parquet files in the range of 10 megs or so. In my case I chose 200,000 records. Lots of smaller parquet files are more space efficient than one large parquet file because dictionary encoding and other compression techniques gets abandoned if the data in a single file has more variety. Writing out roughly 10 megs at a time also releases memory.

Your files will look something like this:

07916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
12259404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
11368445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
07044329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
13145650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
08534162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
12178451 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
11940440 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet
09166540 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0009.parquet
12594044 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0010.parquet
11684245 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0011.parquet
07043129 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0012.parquet
13153650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0013.parquet
08533162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0014.parquet
12137851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0015.parquet
11943040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0016.parquet

B. Create a list of all your smaller parquet files with file sizes when added together do not exceed your HDFS block size. In the example above:

/year=2018/month=01/HoldingDetail_201801_0001.parquet
to
/year=2018/month=01/HoldingDetail_201801_0012.parquet
plus
/year=2018/month=01/HoldingDetail_201801_0014.parquet

Take up 133,408,651 bytes.

C. Open up a new file called HoldingDetail_201801_temp.parquet

Read all the smaller files in your list one at a time and write them to the temp file as parquet ROW GROUP. It is very important to write each file in as a row group which preserves compression encoding and guarantees the amount of bytes (minus schema metadata) written will be the same as the original file size.

Delete all the smaller files in the list. Rename temp file to HoldingDetail_201801_0001.parquet.

Repeat steps B and C for remaining smaller files to create *_0002.parquet, *_0003.parquet, *_0004.parquet, etc. which will be target files with sizes just under the hdfs block size.

(I also add a check that if the sum of file sizes > 0.95 * dfs.blocksize then just go ahead and merge the files found)