Operate on neighbor elements in RDD in Spark
The most efficient solution is to use sliding
method:
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(Seq(1, 3,-1, 0, 2, -4, 6))
.sortBy(identity)
.sliding(2)
.map{case Array(x, y) => y - x}
Suppose you have something like
val seq = sc.parallelize(List(1, 3, -1, 0, 2, -4, 6)).sortBy(identity)
Let's create first collection with index as key like Ton Torres suggested
val original = seq.zipWithIndex.map(_.swap)
Now we can build collection shifted by one element.
val shifted = original.map { case (idx, v) => (idx - 1, v) }.filter(_._1 >= 0)
Next we can calculate needed differences ordered by index descending
val diffs = original.join(shifted)
.sortBy(_._1, ascending = false)
.map { case (idx, (v1, v2)) => v2 - v1 }
So
println(diffs.collect.toSeq)
shows
WrappedArray(3, 1, 1, 1, 1, 3)
Note that you can skip the sortBy
step if reversing is not critical.
Also note that for local collection this could be computed much more simple like:
val elems = List(1, 3, -1, 0, 2, -4, 6).sorted
(elems.tail, elems).zipped.map(_ - _).reverse
But in case of RDD
the zip
method requires each collection should contain equal element count for each partition. So if you would implement tail
like
val tail = seq.zipWithIndex().filter(_._2 > 0).map(_._1)
tail.zip(seq)
would not work since both collection needs equal count of elements for each partition and we have one element for each partition that should travel to previous partition.