How to overwrite the output directory in spark

I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.

When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution.

I set the Spark property set("spark.files.overwrite","true") , but there is no luck.

How to overwrite or Predelete the files from spark?


Solution 1:

UPDATE: Suggest using Dataframes, plus something like ... .write.mode(SaveMode.Overwrite) ....

Handy pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

For older versions try

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

In 1.1.0 you can set conf settings using the spark-submit script with the --conf flag.

WARNING (older versions): According to @piggybox there is a bug in Spark where it will only overwrite files it needs to to write it's part- files, any other files will be left unremoved.

Solution 2:

since df.save(path, source, mode) is deprecated, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

use df.write.format(source).mode("overwrite").save(path)
where df.write is DataFrameWriter

'source' can be ("com.databricks.spark.avro" | "parquet" | "json")

Solution 3:

From the pyspark.sql.DataFrame.save documentation (currently at 1.3.1), you can specify mode='overwrite' when saving a DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

I've verified that this will even remove left over partition files. So if you had say 10 partitions/files originally, but then overwrote the folder with a DataFrame that only had 6 partitions, the resulting folder will have the 6 partitions/files.

See the Spark SQL documentation for more information about the mode options.

Solution 4:

The documentation for the parameter spark.files.overwrite says this: "Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source." So it has no effect on saveAsTextFiles method.

You could do this before saving the file:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas explained here: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html