Spark: disk I/O on stage boundaries explanation
I can't find the information about Spark temporary data persistance on disk in official docs, only at some Spark optimization articles like this:
At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible.
Is persistance to disk on each stage boundary always applied for both: HashJoin and SortMergeJoin? Why does Spark (in-memory engine) does that persistance for tmp files before shuffle? Is that done for task-level recovery or something else?
P.S. Question relates mainly to Spark SQL API, while I'm also interested in Streaming & Structured Streaming
UPD: found a mention and more details of Why does it happens at "Stream Processing with Apache Spark book". Look for "Task Failure Recovery" and "Stage Failure Recovery" topics on referrenced page. As far as I understood, Why = recovery, When = always, since this is mechanics of Spark Core and Shuffle Service, that is responsible for data transfer. Moreover, all Spark's APIs (SQL, Streaming & Structured Streaming) are based on the same failover guarantees (of Spark Core/RDD). So I suppose that this is common behaviour for Spark in general
Solution 1:
It's a good question in that we hear of in-memory Spark vs. Hadoop, so a little confusing. The docs are terrible, but I ran a few things and verified observations by looking around to find a most excellent source: http://hydronitrogen.com/apache-spark-shuffles-explained-in-depth.html
Assuming an Action has been called - so as to avoid the obvious comment if this is not stated, assuming we are not talking about ResultStage and a broadcast join, then we are talking about ShuffleMapStage. We look at an RDD initially.
Then, borrowing from the url:
- DAG dependency involving a shuffle means creation of a separate Stage.
- Map operations are followed by Reduce operations and a Map and so forth.
CURRENT STAGE
- All the (fused) Map operations are performed intra-Stage.
- The next Stage requirement, a Reduce operation - e.g. a reduceByKey, means the output is hashed or sorted by key (K) at end of the Map operations of current Stage.
- This grouped data is written to disk on the Worker where the Executor is - or storage tied to that Cloud version. (I would have thought in memory was possible, if data is small, but this is an architectural Spark approach as stated from the docs.)
- The ShuffleManager is notified that hashed, mapped data is available for consumption by the next Stage. ShuffleManager keeps track of all keys/locations once all of the map side work is done.
NEXT STAGE
- The next Stage, being a reduce, then gets the data from those locations by consulting the Shuffle Manager and using Block Manager.
- The Executor may be re-used or be a new on another Worker, or another Executor on same Worker.
So, my understanding is that architecturally, Stages mean writing to disk, even if enough memory. Given finite resources of a Worker it makes sense that writing to disk occurs for this type of operation. The more important point is, of course, the 'Map Reduce' implementation. I summarized the excellent posting, that is your canonical source.
Of course, fault tolerance is aided by this persistence, less re-computation work.
Similar aspects apply to DFs.
Solution 2:
Spark is not, and never was, an "in-memory engine". If you check the internals it is pretty clear that it is neither optimized for in-memory processing, nor it is tuned for in-memory centered hardware.
On the contrary, almost all design decision were clearly made with an assumption that a size of the data as whole, as well as inputs and outputs of individual tasks, can exceed amount of the available memory of the the cluster and individual executor / executor thread respectively. Furthermore it is clearly designed to be used on commodity hardware.
Such implementation can be used for recovery or to avoid recompuation (see for example What does "Stage Skipped" mean in Apache Spark web UI?), but this is repurposing rather than initial goal.