How do I skip a header from CSV files in Spark?

Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?


Solution 1:

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

Solution 2:

If there were just one header line in the first record, then the most efficient way to filter it out would be:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

Python equivalent:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)

Solution 3:

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

spark.read.option("header","true").csv("filePath")

Solution 4:

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

Solution 5:

Working in 2018 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")

Scala

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

UPDATE 2021 The same code works for Spark 3.x

df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("csv")
    .csv("mycsv.csv")