Better way to convert a string field into timestamp in Spark

Spark >= 2.2

Since you 2.2 you can provide format string directly:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+

Spark >= 1.6, < 2.2

You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")

You can use unix_timestamp to parse strings and cast it to timestamp

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat.

Spark >= 1.5, < 1.6

You'll have to use use something like this:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

or

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

due to SPARK-11724.

Spark < 1.5

you should be able to use these with expr and HiveContext.


I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

Please notice I assume you know Row elements types beforehand (if you read it from a csv file, all them are String), that's why I use a proper type like String and not Any (everything is subtype of Any).

It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a None is simply returned.

You could use it further on with:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))

I have ISO8601 timestamp in my dataset and I needed to convert it to "yyyy-MM-dd" format. This is what I did:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

And you can just use the UDF in your spark SQL query.