This command works with HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

But with Spark SQL I'm getting an error with an org.apache.spark.sql.hive.HiveQl stack trace:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Please guide me to write export to CSV feature in Spark SQL.


Solution 1:

You can use below statement to write the contents of dataframe in CSV format df.write.csv("/data/home/csv")

If you need to write the whole dataframe into a single CSV file, then use df.coalesce(1).write.csv("/data/home/sample.csv")

For spark 1.x, you can use spark-csv to write the results into CSV files

Below scala snippet would help

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

To write the contents into a single file

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")

Solution 2:

Since Spark 2.X spark-csv is integrated as native datasource. Therefore, the necessary statement simplifies to (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

or UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

Notice: as the comments say, it is creating the directory by that name with the partitions in it, not a standard CSV file. This, however, is most likely what you want since otherwise your either crashing your driver (out of RAM) or you could be working with a non distributed environment.

Solution 3:

The answer above with spark-csv is correct but there is an issue - the library creates several files based on the data frame partitioning. And this is not what we usually need. So, you can combine all partitions to one:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

and rename the output of the lib (name "part-00000") to a desire filename.

This blog post provides more details: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

Solution 4:

The simplest way is to map over the DataFrame's RDD and use mkString:

  df.rdd.map(x=>x.mkString(","))

As of Spark 1.5 (or even before that) df.map(r=>r.mkString(",")) would do the same if you want CSV escaping you can use apache commons lang for that. e.g. here's the code we're using

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }

Solution 5:

The error message suggests this is not a supported feature in the query language. But you can save a DataFrame in any format as usual through the RDD interface (df.rdd.saveAsTextFile). Or you can check out https://github.com/databricks/spark-csv.