Why does join fail with "java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]"?
This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.
You can:
- Set higher
spark.sql.broadcastTimeout
to increase timeout -spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
-
persist()
both DataFrames, then Spark will use Shuffle Join - reference from here
PySpark
In PySpark, you can set the config when you build the spark context in the following manner:
spark = SparkSession
.builder
.appName("Your App")
.config("spark.sql.broadcastTimeout", "36000")
.getOrCreate()
Just to add some code context to the very concise answer from @T. Gawęda.
In your Spark application, Spark SQL did choose a broadcast hash join for the join because "libriFirstTable50Plus3DF has 766,151 records" which happened to be less than the so-called broadcast threshold (defaults to 10MB).
You can control the broadcast threshold using spark.sql.autoBroadcastJoinThreshold configuration property.
spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.
You can find that particular type of join in the stack trace:
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
BroadcastHashJoin
physical operator in Spark SQL uses a broadcast variable to distribute the smaller dataset to Spark executors (rather than shipping a copy of it with every task).
If you used explain
to review the physical query plan you'd notice the query uses BroadcastExchangeExec physical operator. This is where you can see the underlying machinery for broadcasting the smaller table (and the timeout).
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
doExecuteBroadcast
is part of SparkPlan
contract that every physical operator in Spark SQL follows that allows for broadcasting if needed. BroadcastExchangeExec
happens to need it.
The timeout parameter is what you are looking for.
private val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}
As you can see you can disable it completely (using a negative value) that would imply to wait for the broadcast variable to be shipped to executors indefinitely or use sqlContext.conf.broadcastTimeout
which is exactly spark.sql.broadcastTimeout configuration property. The default value is 5 * 60
seconds which you can see in the stacktrace:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
In addition to increasing spark.sql.broadcastTimeout
or persist() both DataFrames,
You may try:
1.disable broadcast by setting spark.sql.autoBroadcastJoinThreshold
to -1
2.increase the spark driver memory by setting spark.driver.memory
to a higher value.