Reduce a key-value pair into a key-list pair with Apache Spark
I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn)
into one Key-Multivalue pair (K, [V1, V2, ..., Vn])
. I feel like I should be able to do this using the reduceByKey
function with something of the flavor:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
The error that I get when this occurs is:
'NoneType' object has no attribue 'append'.
My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).
Map and ReduceByKey
Input type and output type of reduce
must be the same, therefore if you want to aggregate a list, you have to map
the input to lists. Afterwards you combine the lists into one list.
Combining lists
You'll need a method to combine lists into one list. Python provides some methods to combine lists.
append
modifies the first list and will always return None
.
x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]
extend
does the same, but unwraps lists:
x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]
Both methods return None
, but you'll need a method that returns the combined list, therefore just use the plus sign.
x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]
Spark
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda actor: (actor.split(",")[0], actor)) \
# transform each value into a list
.map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \
# combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
.reduceByKey(lambda a, b: a + b)
CombineByKey
It's also possible to solve this with combineByKey
, which is used internally to implement reduceByKey
, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.
GroupByKey
It's also possible to solve this with groupByKey
, but it reduces parallelization and therefore could be much slower for big data sets.
tl;dr If you really require operation like this use groupByKey
as suggested by @MariusIon. Every other solution proposed here is either bluntly inefficient are at least suboptimal compared to direct grouping.
reduceByKey
with list concatenation is not an acceptable solution because:
- Requires initialization of O(N) lists.
- Each application of
+
to a pair of lists requires full copy of both lists (O(N)) effectively increasing overall complexity to O(N2). - Doesn't address any of the problems introduced by
groupByKey
. Amount of data that has to be shuffled as well as the size of the final structure are the same. - Unlike suggested by one of the answers there is no difference in a level of parallelism between implementation using
reduceByKey
andgroupByKey
.
combineByKey
with list.extend
is a suboptimal solution because:
- Creates O(N) list objects in
MergeValue
(this could be optimized by usinglist.append
directly on the new item). - If optimized with
list.append
it is exactly equivalent to an old (Spark <= 1.3) implementation of agroupByKey
and ignores all the optimizations introduced by SPARK-3074 which enables external (on-disk) grouping of the larger-than-memory structures.
I'm kind of late to the conversation, but here's my suggestion:
>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
You can use the RDD groupByKey method.
Input:
data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()
Output:
[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
If you want to do a reduceByKey where the type in the reduced KV pairs is different than the type in the original KV pairs, then one can use the function combineByKey
. What the function does is take KV pairs and combine them (by Key) into KC pairs where C is a different type than V.
One specifies 3 functions, createCombiner, mergeValue, mergeCombiners. The first specifies how to transform a type V into a type C, the second describes how to combine a type C with a type V, and the last specifies how to combine a type C with another type C. My code creates the K-V pairs:
Define the 3 functions as follows:
def Combiner(a): #Turns value a (a tuple) into a list of a single tuple.
return [a]
def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
a.extend([b])
return a
def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
a.extend(b)
return a
Then, My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)
The best resource I found on using this function is: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/
As others have pointed out, a.append(b)
or a.extend(b)
return None
. So the reduceByKey(lambda a, b: a.append(b))
returns None on the first pair of KV pairs, then fails on the second pair because None.append(b) fails. You could work around this by defining a separate function:
def My_Extend(a,b):
a.extend(b)
return a
Then call reduceByKey(lambda a, b: My_Extend(a,b))
(The use of the lambda function here may be unnecessary, but I have not tested this case.)