How to pass whole Row to UDF - Spark DataFrame filter
I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.
So I declared the following UDF:
val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))
Intuitively I'm thinking it will work like this:
records.filter("myFilter(*)=true")
What is the actual syntax?
Solution 1:
You have to use struct()
function for constructing the row while making a call to the function, follow these steps.
Import Row,
import org.apache.spark.sql._
Define the UDF
def myFilterFunction(r:Row) = {r.get(0)==r.get(1)}
Register the UDF
sqlContext.udf.register("myFilterFunction", myFilterFunction _)
Create the dataFrame
val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")
Use the UDF
records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show
When u want all columns to be passed to UDF.
records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show
Result:
+------+------+
| text| text2|
+------+------+
|sachin|sachin|
+------+------+
Solution 2:
scala> inputDF
res40: org.apache.spark.sql.DataFrame = [email: string, first_name: string ... 3 more fields]
scala> inputDF.printSchema
root
|-- email: string (nullable = true)
|-- first_name: string (nullable = true)
|-- gender: string (nullable = true)
|-- id: long (nullable = true)
|-- last_name: string (nullable = true)
Now, I would like to filter the rows based on the Gender Field. I can accomplish that by using the .filter($"gender" === "Male")
but I would like to do with the .filter(function)
.
So, defined my anonymous functions
val isMaleRow = (r:Row) => {r.getAs("gender") == "Male"}
val isFemaleRow = (r:Row) => { r.getAs("gender") == "Female" }
inputDF.filter(isMaleRow).show()
inputDF.filter(isFemaleRow).show()
I felt the requirement can be done in a better way i.e without declaring as UDF and invoke it.
Solution 3:
In addition to the first answer. When we want all columns to be passed to UDF we can use
struct("*")