Overwrite specific partitions in spark dataframe write method

I want to overwrite specific partitions instead of all in spark. I am trying the following command:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

where df is dataframe having the incremental data to be overwritten.

hdfs-base-path contains the master data.

When I try the above command, it deletes all the partitions, and inserts those present in df at the hdfs path.

What my requirement is to overwrite only those partitions present in df at the specified hdfs path. Can someone please help me in this?


Solution 1:

Finally! This is now a feature in Spark 2.3.0: SPARK-20236

To use it, you need to set the spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

I recommend doing a repartition based on your partition column before writing, so you won't end up with 400 files per folder.

Before Spark 2.3.0, the best solution would be to launch SQL statements to delete those partitions and then write them with mode append.

Solution 2:

This is a common problem. The only solution with Spark up to 2.0 is to write directly into the partition directory, e.g.,

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

If you are using Spark prior to 2.0, you'll need to stop Spark from emitting metadata files (because they will break automatic partition discovery) using:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

If you are using Spark prior to 1.6.2, you will also need to delete the _SUCCESS file in /root/path/to/data/partition_col=value or its presence will break automatic partition discovery. (I strongly recommend using 1.6.2 or later.)

You can get a few more details about how to manage large partitioned tables from my Spark Summit talk on Bulletproof Jobs.

Solution 3:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")

This works for me on AWS Glue ETL jobs (Glue 1.0 - Spark 2.4 - Python 2)

Solution 4:

Adding 'overwrite=True' parameter in the insertInto statement solves this:

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)

By default overwrite=False. Changing it to True allows us to overwrite specific partitions contained in df and in the partioned_table. This helps us avoid overwriting the entire contents of the partioned_table with df.

Solution 5:

Using Spark 1.6...

The HiveContext can simplify this process greatly. The key is that you must create the table in Hive first using a CREATE EXTERNAL TABLE statement with partitioning defined. For example:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

From here, let's say you have a Dataframe with new records in it for a specific partition (or multiple partitions). You can use a HiveContext SQL statement to perform an INSERT OVERWRITE using this Dataframe, which will overwrite the table for only the partitions contained in the Dataframe:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")

Note: update_dataframe in this example has a schema that matches that of the target test table.

One easy mistake to make with this approach is to skip the CREATE EXTERNAL TABLE step in Hive and just make the table using the Dataframe API's write methods. For Parquet-based tables in particular, the table will not be defined appropriately to support Hive's INSERT OVERWRITE... PARTITION function.

Hope this helps.