How do I add an persistent column of row ids to Spark DataFrame?
Spark 2.0
This is issue has been resolved in Spark 2.0 with SPARK-14241.
Another similar issue has been resolved in Spark 2.1 with SPARK-14393
Spark 1.x
Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id
is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.
It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.
If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID
expression with Nondeterministic
.
I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
In general it could be cleaner to add indices using zipWithIndex
on a RDD
and then convert it back to a DataFrame
.
* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.
I couldn't reproduce this. I'm using Spark 2.0 though so maybe the behaviour has changed, or I am not doing the same thing as you.
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
I was recently working on a similar problem. Although monotonically_increasing_id()
is very fast, it is not reliable and will not give you consecutive row numbers, only increasing unique integers.
Creating a windows partition and then using row_number().over(some_windows_partition)
is extremely time consuming.
The best solution so far is using zipped with index and then converting the zipped file back to the original dataframe, with the new schema including the index column.
Try this:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
Where original_dataframe
is the dataframe
you have to add an index on and row_with_index
is the new schema with the column index which you can write as
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Here, calendar_date
, year_week_number
, year_period_number
, and realization
were the columns of my original dataframe
. You can replace the names with the names of your columns. The index is the new column name you had to add for the row numbers.
This process is largely more efficient and smoother when compared with row_number().over(some_windows_partition)
method.
Hope this helps.
To get around the shifting evaluation of monotonically_increasing_id(), you could try writing the dataframe to disk, and re-reading. Then the id column is now simply a data field that is being read, rather than dynamically calculated at some point in the pipeline. Although it's a pretty ugly solution, it worked when i did a quick test.