Apache Spark Moving Average
I have a huge file in HDFS having Time Series data points (Yahoo Stock prices).
I want to find the moving average of the Time Series how do I go about writing the Apache Spark job to do that .
Solution 1:
You can use the sliding function from MLLIB which probably does the same thing as Daniel's answer. You will have to sort the data by time before using the sliding function.
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
Solution 2:
Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. We have to duplicate the data at the start of the partitions, so that calculating the moving average per partition gives complete coverage.
Here is a way to do this in Spark. The example data:
val ts = sc.parallelize(0 to 100, 10)
val window = 3
A simple partitioner that puts each row in the partition we specify by the key:
class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = key.asInstanceOf[Int]
}
Create the data with the first window - 1
rows copied to the previous partition:
val partitioned = ts.mapPartitionsWithIndex((i, p) => {
val overlap = p.take(window - 1).toArray
val spill = overlap.iterator.map((i - 1, _))
val keep = (overlap.iterator ++ p).map((i, _))
if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values
Just calculate the moving average on each partition:
val movingAverage = partitioned.mapPartitions(p => {
val sorted = p.toSeq.sorted
val olds = sorted.iterator
val news = sorted.iterator
var sum = news.take(window - 1).sum
(olds zip news).map({ case (o, n) => {
sum += n
val v = sum
sum -= o
v
}})
})
Because of the duplicate segments this will have no gaps in coverage.
scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
Solution 3:
Spark 1.4 introduced windowing functions, which means that you can do moving average as follows adjust windowing with rowsBetween:
val schema = Seq("id", "cykle", "value")
val data = Seq(
(1, 1, 1),
(1, 2, 11),
(1, 3, 1),
(1, 4, 11),
(1, 5, 1),
(1, 6, 11),
(2, 1, 1),
(2, 2, 11),
(2, 3, 1),
(2, 4, 11),
(2, 5, 1),
(2, 6, 11)
)
val dft = sc.parallelize(data).toDF(schema: _*)
dft.select('*).show
// PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)
val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show
Output (in zeppelin):
schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
| 1| 1| 1|
| 1| 2| 11|
| 1| 3| 1|
| 1| 4| 11|
| 1| 5| 1|
| 1| 6| 11|
| 2| 1| 1|
| 2| 2| 11|
| 2| 3| 1|
| 2| 4| 11|
| 2| 5| 1|
| 2| 6| 11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
| 1| 1| 4.333333333333333|
| 1| 2| 6.0|
| 1| 3| 5.0|
| 1| 4| 7.0|
| 1| 5| 6.0|
| 1| 6| 7.666666666666667|
| 2| 1| 4.333333333333333|
| 2| 2| 6.0|
| 2| 3| 5.0|
| 2| 4| 7.0|
| 2| 5| 6.0|
| 2| 6| 7.666666666666667|
+---+-----+————————————————————————————————————+