Spark 2.0 Dataset vs DataFrame
starting out with spark 2.0.1 I got some questions. I read a lot of documentation but so far could not find sufficient answers:
- What is the difference between
df.select("foo")
df.select($"foo")
- do I understand correctly that
-
myDataSet.map(foo.someVal)
is typesafe and will not convert intoRDD
but stay in DataSet representation / no additional overhead (performance wise for 2.0.0)
-
- all the other commands e.g. select, .. are just syntactic sugar. They are not typesafe and a map could be used instead. How could I
df.select("foo")
type-safe without a map statement?- why should I use a UDF / UADF instead of a map (assuming map stays in the dataset representation)?
- Difference between
df.select("foo")
anddf.select($"foo")
is signature. The former one takes at least oneString
, the later one zero or moreColumns
. There is no practical difference beyond that. -
myDataSet.map(foo.someVal)
type checks, but as anyDataset
operation usesRDD
of objects, and compared toDataFrame
operations, there is a significant overhead. Let's take a look at a simple example:case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain
== Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118]
As you can see this execution plan requires access to all fields and has to
DeserializeToObject
. -
No. In general other methods are not syntactic sugar and generate a significantly different execution plan. For example:
ds.select($"foo").explain
== Physical Plan == LocalTableScan [foo#117]
Compared to the plan shown before it can access column directly. It is not so much a limitation of the API but a result of a difference in the operational semantics.
-
How could I df.select("foo") type-safe without a map statement?
There is no such option. While typed columns allow you to transform statically
Dataset
into another statically typedDataset
:ds.select($"bar".as[Int])
there are not type safe. There some other attempts to include type safe optimized operations, like typed aggregations, but this experimental API.
-
why should I use a UDF / UADF instead of a map
It is completely up to you. Each distributed data structure in Spark provides its own advantages and disadvantages (see for example Spark UDAF with ArrayType as bufferSchema performance issues).
Personally, I find statically typed Dataset
to be the least useful:
Don't provide the same range of optimizations as
Dataset[Row]
(although they share storage format and some execution plan optimizations it doesn't fully benefit from code generation or off-heap storage) nor access to all the analytical capabilities of theDataFrame
.-
Typed transformations are black boxes, and effectively create analysis barrier for the optimizer. For example selections (filters) cannot be be pushed over typed transformation:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134]
Compared to:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134]
This impacts features like predicate pushdown or projection pushdown.
There are not as flexible as
RDDs
with only a small subset of types supported natively.- "Type safety" with
Encoders
is disputable whenDataset
is converted usingas
method. Because data shape is not encoded using a signature, a compiler can only verify the existence of anEncoder
.
Related questions:
- Perform a typed join in Scala with Spark Datasets
- Spark 2.0 DataSets groupByKey and divide operation and type safety
Spark Dataset
is way more powerful than Spark Dataframe
. Small example - you can only create Dataframe
of Row
, Tuple
or any primitive datatype but Dataset
gives you power to create Dataset
of any non-primitive type too. i.e. You can literally create Dataset
of object type.
Ex:
case class Employee(id:Int,name:String)
Dataset[Employee] // is valid
Dataframe[Employee] // is invalid