I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

I have tried:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error which I got:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What is the right command to load CSV file as DataFrame in Apache Spark?


Solution 1:

spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

Solution 2:

Parse CSV and load as DataFrame/DataSet with Spark 2.x

First, initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

1. Do it in a programmatic way

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Update: Adding all options from here in case the link will be broken in future

  • path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
  • header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
  • delimiter: by default columns are delimited using, but delimiter can be set to any character
  • quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
  • escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
  • parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
  • mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
    • PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
    • DROPMALFORMED: drops lines that have fewer or more tokens than expected or tokens which do not match the schema
    • FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names
  • inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
  • nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
  • dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().

2. You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark version < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,