Mind blown: RDD.zip() method

I just discovered the RDD.zip() method and I cannot imagine what its contract could possibly be.

I understand what it does, of course. However, it has always been my understanding that

  • the order of elements in an RDD is a meaningless concept
  • the number of partitions and their sizes is an implementation detail only available to the user for performance tuning

In other words, an RDD is a (multi)set, not a sequence (and, of course, in, e.g., Python one gets AttributeError: 'set' object has no attribute 'zip')

What is wrong with my understanding above?

What was the rationale behind this method?

Is it legal outside the trivial context like a.map(f).zip(a)?

EDIT 1:

  • Another crazy method is zipWithIndex(), as well as well as the various zipPartitions() variants.
  • Note that first() and take() are not crazy because they are just (non-random) samples of the RDD.
  • collect() is also okay - it just converts a set to a sequence which is perfectly legit.

EDIT 2: The reply says:

when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

This appears to imply that even the trivial a.map(f).zip(a) is not guaranteed to be equivalent to a.map(x => (f(x),x)). What is the situation when zip() results are reproducible?


Solution 1:

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning and order, like map. That said I find it a little easy to accidentally violate the assumptions that zip depends on, since they're a little subtle, but it certainly has a purpose.

Solution 2:

The mental model I use (and recommend) is that the elements of an RDD are ordered, but when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

For those who want to be aware of partitions, I'd say that:

  1. The partitions of an RDD have an order.
  2. The elements within a partition have an order.
  3. If you think of "concatenating" the partitions (say laying them "end to end" in order) using the order of elements within them, the overall ordering you end up with corresponds to the order of elements if you ignore partitions.

But again, if you compute one RDD from another, all bets about the order relationships of the two RDDs are off.

Several members of the RDD class (I'm referring to the Scala API) strongly suggest an order concept (as does their documentation):

collect()
first()
partitions
take()
zipWithIndex()

as does Partition.index as well as SparkContext.parallelize() and SparkContext.makeRDD() (which both take a Seq[T]).

In my experience these ways of "observing" order give results that are consistent with each other, and the ones that translate back and forth between RDDs and ordered Scala collections behave as you would expect -- they preserve the overall order of elements. This is why I say that, in practice, RDDs have a meaningful order concept.

Furthermore, while there are obviously many situations where computing an RDD from another must change the order, in my experience order tends to be preserved where it is possible/reasonable to do so. Operations that don't re-partition and don't fundamentally change the set of elements especially tend to preserve order.

But this brings me to your question about "contract", and indeed the documentation has a problem in this regard. I have not seen a single place where an operation's effect on element order is made clear. (The OrderedRDDFunctions class doesn't count, because it refers to an ordering based on the data, which may differ from the raw order of elements within the RDD. Likewise the RangePartitioner class.) I can see how this might lead you to conclude that there is no concept of element order, but the examples I've given above make that model unsatisfying to me.