how to make saveAsTextFile NOT split output into multiple file?
Solution 1:
The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with
val arr = year.collect()
And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy
, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.
If you require the file to be saved with saveAsTextFile
you can use coalesce(1,true).saveAsTextFile()
. This basically means do the computation then coalesce to 1 partition. You can also use repartition(1)
which is just a wrapper for coalesce
with the shuffle argument set to true. Looking through the source of RDD.scala is how I figured most of this stuff out, you should take a look.
Solution 2:
For those working with a larger dataset:
rdd.collect()
should not be used in this case as it will collect all data as anArray
in the driver, which is the easiest way to get out of memory.rdd.coalesce(1).saveAsTextFile()
should also not be used as the parallelism of upstream stages will be lost to be performed on a single node, where data will be stored from.rdd.coalesce(1, shuffle = true).saveAsTextFile()
is the best simple option as it will keep the processing of upstream tasks parallel and then only perform the shuffle to one node (rdd.repartition(1).saveAsTextFile()
is an exact synonym).rdd.saveAsSingleTextFile()
as provided bellow additionally allows one to store the rdd in a single file with a specific name while keeping the parallelism properties ofrdd.coalesce(1, shuffle = true).saveAsTextFile()
.
Something that can be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
is that it actually produces a file whose path is path/to/file.txt/part-00000
and not path/to/file.txt
.
The following solution rdd.saveAsSingleTextFile("path/to/file.txt")
will actually produce a file whose path is path/to/file.txt
:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
// Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
which can be used this way:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
This snippet:
First stores the rdd with
rdd.saveAsTextFile("path/to/file.txt")
in a temporary folderpath/to/file.txt.tmp
as if we didn't want to store data in one file (which keeps the processing of upstream tasks parallel)And then only, using the hadoop file system api, we proceed with the merge (
FileUtil.copyMerge()
) of the different output files to create our final output single filepath/to/file.txt
.
Solution 3:
You could call coalesce(1)
and then saveAsTextFile()
- but it might be a bad idea if you have a lot of data. Separate files per split are generated just like in Hadoop in order to let separate mappers and reducers write to different files. Having a single output file is only a good idea if you have very little data, in which case you could do collect() as well, as @aaronman said.
Solution 4:
As others have mentioned, you can collect or coalesce your data set to force Spark to produce a single file. But this also limits the number of Spark tasks that can work on your dataset in parallel. I prefer to let it create a hundred files in the output HDFS directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txt
to extract the results into a single file in the local filesystem. This makes the most sense when your output is a relatively small report, of course.
Solution 5:
You can call repartition()
and follow this way:
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")