Pyspark: Pass multiple columns in UDF
I am writing a User Defined Function which will take all the columns except the first one in a dataframe and do sum (or any other operation). Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary.
I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done?
Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.
If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example:
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
... .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID| A| B|Result|
+---+---+---+------+
|101| 1| 16| 17|
+---+---+---+------+
>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
... .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID| A| B| C|Result|
+---+---+---+---+------+
|101| 1| 16| 8| 25|
+---+---+---+---+------+
Another simple way without Array and Struct.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def sum(x, y):
return x + y
sum_cols = udf(sum, IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
Use struct instead of array
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
Maybe it's a late answer, but I don't like using UDFs without necessity, so:
from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])
calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))
df.withColumn("Result", calculate)
#
#id v1 v2 v3 Result
#a 1 2 5 8
#b 2 3 7 12
#c 3 4 8 15
Here u could to use any operation which implement in Column
. Also if u want to write a custom udf
with specific logic, u could use it, because Column
provide tree execution operations. Without collecting to array and sum on it.
If compared with process as array operations, it will be bad from performance perspective, let's take a look at the physical plan, in my case and array case, in my case and array
cased.
my case:
== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]
array case:
== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
+- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]
When possible - we need to avoid using UDFs as Catalyst does not know how to optimize those