Does toPandas() speed up as a pyspark dataframe gets smaller?
Solution 1:
here is the source code to ToPandas,
And first of all, yes, toPandas will be faster if your pyspark dataframe gets smaller, it has similar taste as sdf.collect()
The difference is ToPandas return a pdf and collect return a list.
As you can see from the source code pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
pdf is generated from pd.DataFrame.from_records from the List!
So if your sdf is smaller, there a smaller data to be transferred through the network, and from_record
process less data using your Driver's CPU.
The Design of the second code is different, sdf is distributed, code calls a Mappartition so all worker generates a Pandas dataframe from the subset of the data, then it calls collect, now the all the Pandas dataframe transferred through the network, brought to the driver. Then code calls pd.concat to concat all the dataframe together.
The benefits are:
- When converting to Pandas DataFrame, all the workers work on a small subset of the data in parallel much better than bring all data to the driver and burn your driver's CPU to convert a giant data to Pandas.
- There is a repartition going on, means if your dataset is huge, and you have a low number of partition, the data on each partition will be huge, and toPandas will be failed on OOM of serializer, and also very slow to collect the data
The Drawbacks are:
- now when you collect, you are not collecting the native sdf data, instead of a pandas dataframe which have more metadata attached and generally larger, means the total size of object are bigger
-
pd.concat
is slow lol, but might still better thanfrom_record
So there is no universal conclusion saying which method is better, but choose wisely which tool to use. Like in this question, toPandas might be faster than small sdf, but for large size sdf, the code snippet definitively works better.
Solution 2:
In our case, we found that just not doing toPandas()
and using pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
was fastest. We couldn't use the arrow
option because we got the error "arrow is not supported when using file-based collect".
Looking at the source code for toPandas()
, one reason it may be slow is because it first creates the pandas DataFrame
, and then copies each of the Series
in that DataFrame
over to the returned DataFrame
. If you know that all of your columns have unique names, and that the data types will convert nicely via having pandas infer the dtype
values, there is no need to do any of that copying or dtype inference.
Side note: We were converting a Spark DataFrame on Databricks with about 2 million rows and 6 columns, so your mileage may vary dependent on the size of your conversion.