collect() or toPandas() on a large DataFrame in pyspark/EMR
Solution 1:
TL;DR I believe you're seriously underestimating memory requirements.
Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver.
- First of all Spark SQL uses compressed columnar storage for caching. Depending on the data distribution and compression algorithm in-memory size can be much smaller than the uncompressed Pandas output, not to mention plain
List[Row]
. The latter also stores column names, further increasing memory usage. - Data collection is indirect, with data being stored both on the JVM side and Python side. While JVM memory can be released once data goes through socket, peak memory usage should account for both.
-
Plain
toPandas
implementation collectsRows
first, then creates PandasDataFrame
locally. This further increases (possibly doubles) memory usage. Luckily this part is already addressed on master (Spark 2.3), with more direct approach using Arrow serialization (SPARK-13534 - Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas).For possible solution independent of Apache Arrow you can check Faster and Lower memory implementation toPandas on the Apache Spark Developer List.
Since data is actually pretty large I would consider writing it to Parquet and reading it back directly in Python using PyArrow (Reading and Writing the Apache Parquet Format) completely skipping all the intermediate stages.
Solution 2:
By using arrow setting u will see a speedup
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
Solution 3:
As mentioned above, when calling toPandas(), all records of the DataFrame are collected to the driver program and hence should be done on a small subset of the data. (https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)