How do I convert csv file to rdd

I'm new to spark. I want to perform some operations on particular data in a CSV record.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" "); 

I can get the header values like this. I want to map this to each record in CSV file.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.


A simplistic approach would be to have a way to preserve the header.

Let's say you have a file.csv like:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

We can define a header class that uses a parsed version of the first row:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}

That we can use that header to address the data further down the road:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

Note that the header is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)

PS: Welcome to Scala :-)


You can use the spark-csv library: https://github.com/databricks/spark-csv

This is directly from the documentation:

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);

Firstly I must say that it's much much simpler if you put your headers in separate files - this is the convention in big data.

Anyway Daniel's answer is pretty good, but it has an inefficiency and a bug, so I'm going to post my own. The inefficiency is that you don't need to check every record to see if it's the header, you just need to check the first record for each partition. The bug is that by using .split(",") you could get an exception thrown or get the wrong column when entries are the empty string and occur at the start or end of the record - to correct that you need to use .split(",", -1). So here is the full code:

val header =
  scala.io.Source.fromInputStream(
    hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
    .open(new hadoop.fs.Path(path)))
  .getLines.head

val columnIndex = header.split(",").indexOf(columnName)

sc.textFile(path).mapPartitions(iterator => {
  val head = iterator.next()
  if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))

Final points, consider Parquet if you want to only fish out certain columns. Or at least consider implementing a lazily evaluated split function if you have wide rows.