How can I change column types in Spark SQL's DataFrame?

Suppose I'm doing something like:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

But I really wanted the year as Int (and perhaps transform some other columns).

The best I could come up with was

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

which is a bit convoluted.

I'm coming from R, and I'm used to being able to write, e.g.

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

I'm likely missing something, since there should be a better way to do this in Spark/Scala...


Solution 1:

Edit: Newest version

Since spark 2.x you can use .withColumn. Check the docs here:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

Oldest answer

Since Spark version 1.4 you can apply the cast method with DataType on the column:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

If you are using sql expressions you can also do:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

For more info check the docs: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

Solution 2:

[EDIT: March 2016: thanks for the votes! Though really, this is not the best answer, I think the solutions based on withColumn, withColumnRenamed and cast put forward by msemelman, Martin Senne and others are simpler and cleaner].

I think your approach is ok, recall that a Spark DataFrame is an (immutable) RDD of Rows, so we're never really replacing a column, just creating new DataFrame each time with a new schema.

Assuming you have an original df with the following schema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

And some UDF's defined on one or several columns:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Changing column types or even building a new DataFrame from another can be written like this:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

which yields:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

This is pretty close to your own solution. Simply, keeping the type changes and other transformations as separate udf vals make the code more readable and re-usable.

Solution 3:

As the cast operation is available for Spark Column's (and as I personally do not favour udf's as proposed by @Svend at this point), how about:

df.select( df("year").cast(IntegerType).as("year"), ... )

to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.

In case you need this as a helper method, use:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

which is used like:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )

Solution 4:

First, if you wanna cast type, then this:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

With same column name, the column will be replaced with new one. You don't need to do add and delete steps.

Second, about Scala vs R.
This is the code that most similar to R I can come up with:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Though the code length is a little longer than R's. That is nothing to do with the verbosity of the language. In R the mutate is a special function for R dataframe, while in Scala you can easily ad-hoc one thanks to its expressive power.
In word, it avoid specific solutions, because the language design is good enough for you to quickly and easy build your own domain language.


side note: df.columns is surprisingly a Array[String] instead of Array[Column], maybe they want it look like Python pandas's dataframe.

Solution 5:

You can use selectExpr to make it a little cleaner:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")