How does Spark aggregate function - aggregateByKey work?

Say I have a distribute system on 3 nodes and my data is distributed among those nodes. for example, I have a test.csv file which exists on all 3 nodes and it contains 2 columns of:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

Then I use SparkContext.textFile to read the file out as rdd and so. So far as I understand, each spark worker node will read the a portion out from the file. So right now let's say each node will store:

  • node 1: row 1~4
  • node 2: row 5~8
  • node 3: row 9~12

My question is that let's say I want to do computation on those data, and there is one step that I need to group the key together, so the key value pair would be [k1 [{k1 c1} {k1 c2} {k1 c3}]].. and so on.

There is a function called groupByKey() which is very expensive to use, and aggregateByKey() is recommended to use. So I'm wondering how does groupByKey() and aggregateByKey() works under the hood? Can someone using the example I provided above to explain please? After shuffling where does the rows reside on each node?


aggregateByKey() is quite different from reduceByKey. What happens is that reduceByKey is sort of a particular case of aggregateByKey.

aggregateByKey() will combine the values for a particular key, and the result of such combination can be any object that you specify. You have to specify how the values are combined ("added") inside one partition (that is executed in the same node) and how you combine the result from different partitions (that may be in different nodes). reduceByKey is a particular case, in the sense that the result of the combination (e.g. a sum) is of the same type that the values, and that the operation when combined from different partitions is also the same as the operation when combining values inside a partition.

An example: Imagine you have a list of pairs. You parallelize it:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Now you want to "combine" them by key producing a sum. In this case reduceByKey and aggregateByKey are the same:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Now, imagine that you want the aggregation to be a Set of the values, that is a different type that the values, that are integers (the sum of integers is also integers):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))

aggregateByKey() is almost identical to reduceByKey() (both calling combineByKey() behind the scenes), except you give a starting value for aggregateByKey(). Most people are familiar with reduceByKey(), so I will use that in the explanation.

The reason reduceByKey() is so much better is because it makes use of a MapReduce feature called a combiner. Any function like + or * can be used in this fashion because the order of the elements it is called on doesn't matter. This allows Spark to start "reducing" values with the same key even if they are not all in the same partition yet.

On the flip side groupByKey() gives you more versatility since you write a function that takes an Iterable, meaning you could even pull all the elements into an array. However it is inefficient because for it to work the full set of (K,V,) pairs have to be in one partition.

The step that moves the data around on a reduce type operation is generally called the shuffle, at the very simplest level the data is partitioned to each node (often with a hash partitioner), and then sorted on each node.