Scala-Spark Dynamically call groupby and agg with parameter values
I want to write a custom grouping and aggregate function to get user specified column names and user specified aggregation map.I do not know the column names and aggregation map up front. I want to write a function similar to something like below. But i am new to Scala and i cannot solve it.
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}
and want to call it like
val listOfStrings = List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
How can i do this? Can anyone help me please.
Solution 1:
Your code is almost correct - with two issues:
The return type of your function is
DataFrame
, but the last line isaggregated.show()
, which returnsUnit
. Remove the call toshow
to returnaggregated
itself, or just return the result ofagg
immediatelyDataFrame.groupBy
expects arguments as follows:col1: String, cols: String*
- so you need to pass matching arguments: the first columns, and then the rest of the columns as a list of arguments, you can do that as follows:df.groupBy(cols.head, cols.tail: _*)
Altogether, your function would be:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}
Or, a similar shorter version:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
If you do want to call show
within your function:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}