Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]
Solution 1:
Cartesian product and combinations are two different things, the cartesian product will create an RDD of size rdd.size() ^ 2
and combinations will create an RDD of size rdd.size() choose 2
val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()
Note this will only work if an ordering is defined on the elements of the list, since we use <
. This one only works for choosing two but can easily be extended by making sure the relationship a < b
for all a and b in the sequence
Solution 2:
This is supported natively by a Spark RDD with the cartesian
transformation.
e.g.:
val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect
Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5),
(2,1), (2,2), (2,3), (2,4), (2,5),
(3,1), (3,2), (3,3), (3,4), (3,5),
(4,1), (4,2), (4,3), (4,4), (4,5),
(5,1), (5,2), (5,3), (5,4), (5,5))
Solution 3:
As discussed, cartesian
will give you n^2 elements of the cartesian product of the RDD with itself.
This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (used String as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)
This is probably less time efficient that cartesian + filtering due to the iterative count
and take
actions that forces the computation of the RDD, but more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)
elements instead of the n^2
of the cartesian product.
import org.apache.spark.rdd._
def combs(rdd:RDD[String]):RDD[(String,String)] = {
val count = rdd.count
if (rdd.count < 2) {
sc.makeRDD[(String,String)](Seq.empty)
} else if (rdd.count == 2) {
val values = rdd.collect
sc.makeRDD[(String,String)](Seq((values(0), values(1))))
} else {
val elem = rdd.take(1)
val elemRdd = sc.makeRDD(elem)
val subtracted = rdd.subtract(elemRdd)
val comb = subtracted.map(e => (elem(0),e))
comb.union(combs(subtracted))
}
}