Encoder error while trying to map dataframe row to updated row
When I m trying to do the same thing in my code as mentioned below
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
I have taken the above reference from here: Scala: How can I replace value in Dataframs using scala But I am getting encoder error as
Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.
Note: I am using spark 2.0!
Solution 1:
There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:
- in 1.x
DataFrame.map
is((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- in 2.x
Dataset[Row].map
is((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
If you really want to use map
you should use statically typed Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
or at least return an object which will have implicit encoder:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Finally if for some completely crazy reason you really want to map over Dataset[Row]
you have to provide required encoder:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
Solution 2:
For scenario where dataframe schema is known in advance answer given by @zero323 is the solution
but for scenario with dynamic schema / or passing multiple dataframe to a generic function: Following code has worked for us while migrating from 1.6.1 from 2.2.0
import org.apache.spark.sql.Row
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
this code executes on both the versions of spark.
disadvantage : optimization provided by spark on dataframe/datasets api wont be applied.
Solution 3:
Just to add a few other important-to-know points in order to well understand the other answers (especially the final point of @zero323's answer about map
over Dataset[Row]
):
- First of all,
Dataframe.map
gives you aDataset
(more specifically,Dataset[T]
, rather thanDataset[Row]
)! - And
Dataset[T]
always requires an encoder, that's what this sentence "Dataset[Row].map
is((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
" means. - There are indeed lots of encoders predefined already by Spark (which can be
import
ed by doingimport spark.implicits._
), but still the list would not be able to cover many domain specific types that developers may create, in which case you need to create encoders yourself. - In the specific example on this page,
df.map
returns aRow
type forDataset
, and hang on a minute,Row
type is not within the list of types that have encoders predefined by Spark, hence you are going to create one on your own. - And I admit that creating an encoder for
Row
type is a bit different than the approach described in the above link, and you have to useRowEncoder
which takesStructType
as param describing type of a row, like what @zero323 provides above:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))
// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)