Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey
Can anyone explain the difference between reducebykey
, groupbykey
, aggregatebykey
and combinebykey
? I have read the documents regarding this, but couldn't understand the exact differences.
An explanation with examples would be great.
Solution 1:
groupByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
can cause out of disk problems as data is sent over the network and collected on the reduced workers.
reduceByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey
required combining all your values into another value with the exact same type.
aggregateByKey:
same as reduceByKey
, which takes an initial value.
3 parameters as input
- initial value
- Combiner logic
- sequence op logic
Example:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
combineByKey:
3 parameters as input
- Initial value: unlike
aggregateByKey
, need not pass constant always, we can pass a function that will return a new value. - merging function
- combine function
Example:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
preferred over groupByKey
Reference: Avoid groupByKey
Solution 2:
-
groupByKey()
is just to group your dataset based on a key. It will result in data shuffling when RDD is not already partitioned. -
reduceByKey()
is something like grouping + aggregation. We can sayreduceByKey()
equivalent to dataset.group(...).reduce(...). It will shuffle less data unlikegroupByKey()
. -
aggregateByKey()
is logically same asreduceByKey()
but it lets you return result in different type. In another words, it lets you have an input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,"six") as output. It also takes zero-value that will be applied at the beginning of each key.
Note: One similarity is they all are wide operations.