How to transpose an RDD in Spark

Solution 1:

Say you have an N×M matrix.

If both N and M are so small that you can hold N×M items in memory, it doesn't make much sense to use an RDD. But transposing it is easy:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)

If N or M is so large that you cannot hold N or M entries in memory, then you cannot have an RDD line of this size. Either the original or the transposed matrix is impossible to represent in this case.

N and M may be of a medium size: you can hold N or M entries in memory, but you cannot hold N×M entries. In this case you have to blow up the matrix and put it together again:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
  case (row, rowIndex) => row.zipWithIndex.map {
    case (number, columnIndex) => columnIndex -> (rowIndex, number)
  }
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
  indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}

Solution 2:

A first draft without using collect(), so everything runs worker side and nothing is done on driver:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))

rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
   .map(v => (v._2, v._1)) // key by column position
   .groupByKey.sortByKey   // regroup on column position, thus all elements from the first column will be in the first row
   .map(_._2)              // discard the key, keep only value

The problem with this solution is that the columns in the transposed matrix will end up shuffled if the operation is performed in a distributed system. Will think of an improved version

My idea is that in addition to attach the 'column number' to each element of the matrix, we attach also the 'row number'. So we could key by column position and regroup by key like in the example, but then we could reorder each row on the row number and then strip row/column numbers from the result. I just don't have a way to know the row number when importing a file into an RDD.

You might think it's heavy to attach a column and a row number to each matrix element, but i guess that's the price to pay to have the possibility to process your input as chunks in a distributed fashion and thus handle huge matrices.

Will update the answer when i find a solution to the ordering problem.

Solution 3:

As of Spark 1.6 you can use the pivot operation on DataFrames, depending on the actual shape of your data, if you put it into a DF you could pivot columns to rows, the following databricks blog is very useful as it describes in detail a number of pivoting use cases with code examples