Left Anti join in Spark?

I've defined two tables like this:

 val tableName = "table1"
    val tableName2 = "table2"

    val format = new SimpleDateFormat("yyyy-MM-dd")
      val data = List(
        List("mike", 26, true),
        List("susan", 26, false),
        List("john", 33, true)
      )
    val data2 = List(
        List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
        List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
      )

      val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
      val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
      val schema = StructType(Array(
        StructField("name", StringType, true),
        StructField("age", IntegerType, true),
        StructField("isBoy", BooleanType, false)
      ))
    val schema2 = StructType(Array(
        StructField("name", StringType, true),
        StructField("grade", StringType, true),
        StructField("howold", IntegerType, true),
        StructField("hobby", StringType, true),
        StructField("birthday", DateType, false)
      ))

      val df = sqlContext.createDataFrame(rdd, schema)
      val df2 = sqlContext.createDataFrame(rdd2, schema2)
      df.createOrReplaceTempView(tableName)
      df2.createOrReplaceTempView(tableName2)

I'm trying to build query to return rows from table1 that doesn't have matching row in table2. I've tried to do it using this query:

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL

but this just gives me all rows from table1:

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"isBoy":false}, {"name":"mike","age":26,"isBoy":true})

How to make this type of join in Spark efficiently?

I'm looking for an SQL query because I need to be able to specify columns which to compare between two tables, not just compare row by row like it is done in other recommended questions. Like using subtract, except etc.


Solution 1:

You can use the "left anti" join type - either with DataFrame API or with SQL (DataFrame API supports everything that SQL supports, including any join condition you need):

DataFrame API:

df.as("table1").join(
  df2.as("table2"),
  $"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
  "leftanti"
)

SQL:

sqlContext.sql(
  """SELECT table1.* FROM table1
    | LEFT ANTI JOIN table2
    | ON table1.name = table2.name AND table1.age = table2.howold
  """.stripMargin)

NOTE: it's also worth noting that there's a shorter, more concise way of creating the sample data without specifying the schema separately, using tuples and the implicit toDF method, and then "fixing" the automatically-inferred schema where needed:

import spark.implicits._
val df = List(
  ("mike", 26, true),
  ("susan", 26, false),
  ("john", 33, true)
).toDF("name", "age", "isBoy")

val df2 = List(
  ("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
  ("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))

Solution 2:

You can do it with the built in function except (I would have used the code you provided, but you didn't include the imports, so I couldn't just c/p it :( )

val a = sc.parallelize(Seq((1,"a",123),(2,"b",456))).toDF("col1","col2","col3")
val b= sc.parallelize(Seq((4,"a",432),(2,"t",431),(2,"b",456))).toDF("col1","col2","col3")

scala> a.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
|   2|   b| 456|
+----+----+----+


scala> b.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   4|   a| 432|
|   2|   t| 431|
|   2|   b| 456|
+----+----+----+

scala> a.except(b).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
+----+----+----+

Solution 3:

you can use left anti.

dfRcc20.as("a").join(dfClientesDuplicados.as("b")
  ,col("a.eteerccdiid")===col("b.eteerccdiid")&&
    col("a.eteerccdinr")===col("b.eteerccdinr")
  ,"left_anti")