Spark 2.1 Hangs while reading a huge datasets

Solution 1:

First, you are initiating two SparkSessions which is quite useless and you are just splitting resources. So don't do that !

Secondly, and here is where the problem is. There is a misunderstanding concerning the parallelism and the jdbc source with Apache Spark (don't worry, it's a gotcha ! ).

It's mainly due to missing documentation. (The last time I have checked)

So back to the problem. What's actually happening is that following line :

val destination = spark.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))

is that it's delegating reads to a single worker.

So mainly, if you had enough memory and you succeeded in reading that data. The whole destination data will be in one partition. And one partition means troubles ! a.k.a possible :

java.lang.OutOfMemoryError: GC overhead limit exceeded

So what happened is that the single executor which has been chosen to fetch the data is overwhelmed and it's JVM blew up.

Let's solve this now :

(Disclaimer : the following code is an excerpt from spark-gotchas and I'm one of it's authors.)

So let's create some sample data and save them in our database :

val options = Map(
  "url" -> "jdbc:postgresql://127.0.0.1:5432/spark",
  "dbtable" -> "data",
  "driver" -> "org.postgresql.Driver",
  "user" -> "spark",
  "password" -> "spark"
)

val newData = spark.range(1000000)
  .select($"id", lit(""), lit(true), current_timestamp())
  .toDF("id", "name", "valid", "ts")

newData.write.format("jdbc").options(options).mode("append").save

Apache Spark provides two methods which be used for distributed data loading over JDBC. The first one partitions data using an integer column:

val dfPartitionedWithRanges = spark.read.options(options)
  .jdbc(options("url"), options("dbtable"), "id", 1, 5, 4, new java.util.Properties())

dfPartitionedWithRanges.rdd.partitions.size
// Int = 4

dfPartitionedWithRanges.rdd.glom.collect
// Array[Array[org.apache.spark.sql.Row]] = Array(
//   Array([1,foo,true,2012-01-01 00:03:00.0]),
//   Array([2,foo,false,2013-04-02 10:10:00.0]),
//   Array([3,bar,true,2015-11-02 22:00:00.0]),
//   Array([4,bar,false,2010-11-02 22:00:00.0]))
Partition column and bounds can provided using options as well:

val optionsWithBounds = options ++ Map(
  "partitionColumn" -> "id",
  "lowerBound" -> "1",
  "upperBound" -> "5",
  "numPartitions" -> "4"
)

spark.read.options(optionsWithBounds).format("jdbc").load

Partition column and bounds can provided using options as well:

val optionsWithBounds = options ++ Map(
  "partitionColumn" -> "id",
  "lowerBound" -> "1",
  "upperBound" -> "5",
  "numPartitions" -> "4"
)

spark.read.options(optionsWithBounds).format("jdbc").load

Another option would be to use a sequence of predicates but I won't be talking about it here.

You can read more about Spark SQL and the JDBC Source here along with some other gotchas.

I hope this helps.