Spark gives a StackOverflowError when training using ALS

When attempting to train a machine learning model using ALS in Spark's MLLib, I kept on receiving a StackoverflowError. Here's a small sample of the stack trace:

Traceback (most recent call last):
  File "/Users/user/Spark/imf.py", line 31, in <module>
    model = ALS.train(rdd, rank, numIterations)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/recommendation.py", line 140, in train
    lambda_, blocks, nonnegative, seed)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 120, in callMLlibFunc
    return callJavaFunc(sc, api, *args)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 113, in callJavaFunc
    return _java2py(sc, func(*args))
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.trainALSModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 35, localhost): java.lang.StackOverflowError
        at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296)
        at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589)

This error would also appear when attempting to run .mean() to calculate the Mean Squared Error. It appeared in both version 1.3.1_1 and version 1.4.1 of Spark. I was using PySpark, and increasing the memory available did not help.


Solution 1:

The solution was to add checkpointing, which prevents the recursion used by the codebase from creating an overflow. First, create a new directory to store the checkpoints. Then, have your SparkContext use that directory for checkpointing. Here is the example in Python:

sc.setCheckpointDir('checkpoint/')

You may also need to add checkpointing to the ALS as well, but I haven't been able to determine whether that makes a difference. To add a checkpoint there (probably not necessary), just do:

ALS.checkpointInterval = 2