How to convert a DataFrame back to normal RDD in pyspark?
I need to use the
(rdd.)partitionBy(npartitions, custom_partitioner)
method that is not available on the DataFrame. All of the DataFrame methods refer only to DataFrame results. So then how to create an RDD from the DataFrame data?
Note: this is a change (in 1.3.0) from 1.2.0.
Update from the answer from @dpangmao: the method is .rdd. I was interested to understand if (a) it were public and (b) what are the performance implications.
Well (a) is yes and (b) - well you can see here that there are significant perf implications: a new RDD must be created by invoking mapPartitions :
In dataframe.py (note the file name changed as well (was sql.py):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
Use the method .rdd
like this:
rdd = df.rdd
@dapangmao's answer works, but it doesn't give the regular spark RDD, it returns a Row object. If you want to have the regular RDD format.
Try this:
rdd = df.rdd.map(tuple)
or
rdd = df.rdd.map(list)
Answer given by kennyut/Kistian works very well but to get exact RDD like output when RDD consist of list of attributes e.g. [1,2,3,4] we can use flatmap command as below,
rdd = df.rdd.flatMap(list)
or
rdd = df.rdd.flatmap(lambda x: list(x))