Updating a dataframe column in spark
While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction
implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
now has the same schema as old_df
(assuming that old_df.target_column
was of type StringType
as well) but all values in column target_column
will be new_value
.
Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
DataFrames
are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations like map
.
A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.
Just as maasg says you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame df
with two rows:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Note that if the types of the columns change, you need to give it a correct schema instead of df.schema
. Check out the api of org.apache.spark.sql.Row
for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Update] Or using UDFs in Scala:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
and if the column name needs to stay the same you can rename it back:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")