Automatically and Elegantly flatten DataFrame in Spark SQL

Solution 1:

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

You would then use it like this:

df.select(flattenSchema(df.schema):_*)

Solution 2:

Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()