How do I skip a header from CSV files in Spark?
Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?
val rdd=sc.textFile("file1,file2,file3")
Now, how can we skip header lines from this rdd?
Solution 1:
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
Solution 2:
If there were just one header line in the first record, then the most efficient way to filter it out would be:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.
You could also just write a filter
that matches only a line that could be a header. This is quite simple, but less efficient.
Python equivalent:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
Solution 3:
In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:
spark.read.option("header","true").csv("filePath")
Solution 4:
From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:
val spark = SparkSession.builder.config(conf).getOrCreate()
and then as @SandeepPurohit said:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
I hope it solved your question !
P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package
Solution 5:
Working in 2018 (Spark 2.3)
Python
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
Scala
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1: myManualSchema is a predefined schema written by me, you could skip that part of code
UPDATE 2021 The same code works for Spark 3.x
df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")