Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Getting strange behavior when calling function outside of a closure:

  • when function is in a object everything is working
  • when function is in a class get :

Task not serializable: java.io.NotSerializableException: testing

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

This is a working code example:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

This is the non-working example :

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

Solution 1:

RDDs extend the Serialisable interface, so this is not what's causing your task to fail. Now this doesn't mean that you can serialise an RDD with Spark and avoid NotSerializableException

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

  1. serialized on the driver node,
  2. shipped to the appropriate nodes in the cluster,
  3. deserialized,
  4. and finally executed on the nodes

You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]

What happens in your second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. You have two possibilities:

Either you make class testing serializable, so the whole class can be serialized by Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

or you make someFunc function instead of a method (functions are objects in Scala), so that Spark will be able to serialize it:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Similar, but not the same problem with class serialization can be of interest to you and you can read on it in this Spark Summit 2013 presentation.

As a side note, you can rewrite rddList.map(someFunc(_)) to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it's less verbose and cleaner to read.

EDIT (2015-03-15): SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.

In OP's case, this is what gets printed to stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

Solution 2:

Grega's answer is great in explaining why the original code does not work and two ways to fix the issue. However, this solution is not very flexible; consider the case where your closure includes a method call on a non-Serializable class that you have no control over. You can neither add the Serializable tag to this class nor change the underlying implementation to change the method into a function.

Nilesh presents a great workaround for this, but the solution can be made both more concise and general:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

This function-serializer can then be used to automatically wrap closures and method calls:

rdd map genMapper(someFunc)

This technique also has the benefit of not requiring the additional Shark dependencies in order to access KryoSerializationWrapper, since Twitter's Chill is already pulled in by core Spark