How to assign unique contiguous numbers to elements in a Spark RDD

I have a dataset of (user, product, review), and want to feed it into mllib's ALS algorithm.

The algorithm needs users and products to be numbers, while mine are String usernames and String SKUs.

Right now, I get the distinct users and SKUs, then assign numeric IDs to them outside of Spark.

I was wondering whether there was a better way of doing this. The one approach I've thought of is to write a custom RDD that essentially enumerates 1 through n, then call zip on the two RDDs.


Solution 1:

Starting with Spark 1.0 there are two methods you can use to solve this easily:

  • RDD.zipWithIndex is just like Seq.zipWithIndex, it adds contiguous (Long) numbers. This needs to count the elements in each partition first, so your input will be evaluated twice. Cache your input RDD if you want to use this.
  • RDD.zipWithUniqueId also gives you unique Long IDs, but they are not guaranteed to be contiguous. (They will only be contiguous if each partition has the same number of elements.) The upside is that this does not need to know anything about the input, so it will not cause double-evaluation.

Solution 2:

For a similar example use case, I just hashed the string values. See http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

It sounds like you're already doing something like this, although hashing can be easier to manage.

Matei suggested here an approach to emulating zipWithIndex on an RDD, which amounts to assigning IDs within each partiition that are going to be globally unique: https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

Solution 3:

Another easy option, if using DataFrames and just concerned about the uniqueness is to use function MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Edit: MonotonicallyIncreasingID was deprecated and removed since Spark 2.0; it is now known as monotonically_increasing_id .