Concatenate columns in Apache Spark DataFrame
How do we concatenate two columns in an Apache Spark DataFrame? Is there any function in Spark SQL which we can use?
Solution 1:
With raw SQL you can use CONCAT
:
-
In Python
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
-
In Scala
import sqlContext.implicits._ val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
Since Spark 1.5.0 you can use concat
function with DataFrame API:
-
In Python :
from pyspark.sql.functions import concat, col, lit df.select(concat(col("k"), lit(" "), col("v")))
-
In Scala :
import org.apache.spark.sql.functions.{concat, lit} df.select(concat($"k", lit(" "), $"v"))
There is also concat_ws
function which takes a string separator as the first argument.
Solution 2:
Here's how you can do custom naming
import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
gives,
+--------+--------+
|colname1|colname2|
+--------+--------+
| row11| row12|
| row21| row22|
+--------+--------+
create new column by concatenating:
df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()
+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
| row11| row12| row11_row12|
| row21| row22| row21_row22|
+--------+--------+-------------+
Solution 3:
One option to concatenate string columns in Spark Scala is using concat
.
It is necessary to check for null values. Because if one of the columns is null, the result will be null even if one of the other columns do have information.
Using concat
and withColumn
:
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
Using concat
and select
:
val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")
With both approaches you will have a NEW_COLUMN which value is a concatenation of the columns: COL1 and COL2 from your original df.