Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because this method is so poorly documented in the Apache Spark with Python documentation -- and is why I wrote this Q&A -- until recently I had been using the above code sequence. But again, it's less efficient, so avoid doing it that way unless necessary.

Here's how to do the same using the rdd.aggregateByKey() method (recommended):

By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

Where the following is true about the meaning of each a and b pair above (so you can visualize what's happening):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finally, calculate the average for each KEY, and collect results.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

I hope this question and answer with aggregateByKey() will help.


To my mind a more readable equivalent to an aggregateByKey with two lambdas is:

rdd1 = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

In this way the whole average calculation would be:

avg_by_key = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()