Concatenate columns in Apache Spark DataFrame

How do we concatenate two columns in an Apache Spark DataFrame? Is there any function in Spark SQL which we can use?


Solution 1:

With raw SQL you can use CONCAT:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • In Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Since Spark 1.5.0 you can use concat function with DataFrame API:

  • In Python :

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • In Scala :

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

There is also concat_ws function which takes a string separator as the first argument.

Solution 2:

Here's how you can do custom naming

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

gives,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

create new column by concatenating:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

Solution 3:

One option to concatenate string columns in Spark Scala is using concat.

It is necessary to check for null values. Because if one of the columns is null, the result will be null even if one of the other columns do have information.

Using concat and withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Using concat and select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

With both approaches you will have a NEW_COLUMN which value is a concatenation of the columns: COL1 and COL2 from your original df.