What is going wrong with `unionAll` of Spark `DataFrame`?

Using Spark 1.5.0 and given the following code, I expect unionAll to union DataFrames based on their column name. In the code, I'm using some FunSuite for passing in SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

Output:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

Why does the result contain intermixed "b" and "a" columns, instead of aligning columns bases on column names? Sounds like a serious bug!?


Solution 1:

It doesn't look like a bug at all. What you see is a standard SQL behavior and every major RDMBS, including PostgreSQL, MySQL, Oracle and MS SQL behaves exactly the same. You'll find SQL Fiddle examples linked with names.

To quote PostgreSQL manual:

In order to calculate the union, intersection, or difference of two queries, the two queries must be "union compatible", which means that they return the same number of columns and the corresponding columns have compatible data types

Column names, excluding the first table in the set operation, are simply ignored.

This behavior comes directly form the Relational Algebra where basic building block is a tuple. Since tuples are ordered an union of two sets of tuples is equivalent (ignoring duplicates handling) to the output you get here.

If you want to match using names you can do something like this

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}

To check both names and types it is should be enough to replace columns with:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq

Solution 2:

This issue is getting fixed in spark2.3. They are adding support of unionByName in the dataset.

https://issues.apache.org/jira/browse/SPARK-21043

Solution 3:

no issues/bugs - if you observe your case class B very closely then you will be clear. Case Class A --> you have mentioned the order (a,b), and Case Class B --> you have mentioned the order (b,a) ---> this is expected as per order

case class A (a: Int, b: Int) case class B (b: Int, a: Int)

thanks, Subbu

Solution 4:

Use unionByName:

Excerpt from the documentation:

def unionByName(other: Dataset[T]): Dataset[T]

The difference between this function and union is that this function resolves columns by name (not by position):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+