spark-streaming and connection pool implementation

Solution 1:

To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object provides this functionality out of the box.

The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.

Let's try to sketch(*) such service:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}

Then usage becomes:

val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}

I'm assuming that the GenericObjectPool used in the question is taking care of concurrency. Otherwise, access to each pool instance need to be guarded with some form of synchronization.

(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.

Solution 2:

Below answer is wrong! I'm leaving the answer here for reference, but the answer is wrong for the following reason. socketPool is declared as a lazy val so it will get instantiated with each first request for access. Since the SocketPool case class is not Serializable, this means that it will get instantiated within each partition. Which makes the connection pool useless because we want to keep connections across partitions and RDDs. It makes no difference wether this is implemented as a companion object or as a case class. Bottom line is: the connection pool must be Serializable, and apache commons pool is not.

import java.io.PrintStream
import java.net.Socket

import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.spark.streaming.dstream.DStream

/**
 * Publish a Spark stream to a socket.
 */
class PooledSocketStreamPublisher[T](host: String, port: Int)
  extends Serializable {

    lazy val socketPool = SocketPool(host, port)

    /**
     * Publish the stream to a socket.
     */
    def publishStream(stream: DStream[T], callback: (T) => String) = {
        stream.foreachRDD { rdd =>

            rdd.foreachPartition { partition =>

                val socket = socketPool.getSocket
                val out = new PrintStream(socket.getOutputStream)

                partition.foreach { event =>
                    val text : String = callback(event)
                    out.println(text)
                    out.flush()
                }

                out.close()
                socketPool.returnSocket(socket)

            }
        }
    }

}

class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] {

    def create(): Socket = {
        new Socket(host, port)
    }

    def wrap(socket: Socket): PooledObject[Socket] = {
        new DefaultPooledObject[Socket](socket)
    }

}

case class SocketPool(host: String, port: Int) {

    val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port))

    def getSocket: Socket = {
        socketPool.borrowObject
    }

    def returnSocket(socket: Socket) = {
        socketPool.returnObject(socket)
    }

}

which you can invoke as follows:

val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009)
socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e)))