How to pass whole Row to UDF - Spark DataFrame filter

I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

Intuitively I'm thinking it will work like this:

records.filter("myFilter(*)=true")

What is the actual syntax?


Solution 1:

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

Import Row,

import org.apache.spark.sql._

Define the UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

Register the UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

Create the dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

Use the UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

When u want all columns to be passed to UDF.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

Result:

+------+------+
|  text| text2|
+------+------+
|sachin|sachin|
+------+------+

Solution 2:

scala> inputDF
res40: org.apache.spark.sql.DataFrame = [email: string, first_name: string ... 3 more fields]

scala> inputDF.printSchema
root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)

Now, I would like to filter the rows based on the Gender Field. I can accomplish that by using the .filter($"gender" === "Male") but I would like to do with the .filter(function).

So, defined my anonymous functions

val isMaleRow = (r:Row) => {r.getAs("gender") == "Male"}

val isFemaleRow = (r:Row) => { r.getAs("gender") == "Female" }

inputDF.filter(isMaleRow).show()

inputDF.filter(isFemaleRow).show()

I felt the requirement can be done in a better way i.e without declaring as UDF and invoke it.

Solution 3:

In addition to the first answer. When we want all columns to be passed to UDF we can use

 struct("*")