Load CSV file with Spark
I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
I would expect this call to give me a list of the two first columns of my file but I'm getting this error :
File "", line 1, in IndexError: list index out of range
although my CSV file as more than one column.
Solution 1:
Spark 2.0.0+
You can use built-in csv data source directly:
spark.read.csv(
"some_input_file.csv",
header=True,
mode="DROPMALFORMED",
schema=schema
)
or
(
spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv")
)
without including any external dependencies.
Spark < 2.0.0:
Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv
:
Make sure that Spark CSV is included in the path (--packages
, --jars
, --driver-class-path
)
And load your data as follows:
df = (
sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.
Note:
If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader
. Assuming you have three columns - integer, double and string:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(
sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
Solution 2:
Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
Alternatively, you could print the culprit (if any):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()