Python random sample with a generator / iterable / iterator

Do you know if there is a way to get python's random.sample to work with a generator object. I am trying to get a random sample from a very large text corpus. The problem is that random.sample() raises the following error.

TypeError: object of type 'generator' has no len()

I was thinking that maybe there is some way of doing this with something from itertools but couldn't find anything with a bit of searching.

A somewhat made up example:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )


UPDATE


As per MartinPieters's request I did some timing of the currently proposed three methods. The results are as follows.

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

So it turns out that the array.insert has a serious drawback when it comes to large sample sizes. The code I used to time the methods

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

I also ran a test to check that all the methods indeed do take an unbiased sample of the generator. So for all methods I sampled 1000 elements from 10000 100000 times and computed the average frequency of occurrence of each item in the population which turns out to be ~.1 as one would expect for all three methods.


While the answer of Martijn Pieters is correct, it does slow down when samplesize becomes large, because using list.insert in a loop may have quadratic complexity.

Here's an alternative that, in my opinion, preserves the uniformity while increasing performance:

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except StopIteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results

The difference slowly starts to show for samplesize values above 10000. Times for calling with (1000000, 100000):

  • iterSample: 5.05s
  • iter_sample_fast: 2.64s

You can't.

You have two options: read the whole generator into a list, then sample from that list, or use a method that reads the generator one by one and picks the sample from that:

import random

def iterSample(iterable, samplesize):
    results = []

    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

This method adjusts the chance that the next item is part of the sample based on the number of items in the iterable so far. It doesn't need to hold more than samplesize items in memory.

The solution isn't mine; it was provided as part of another answer here on SO.


Just for the heck of it, here's a one-liner that samples k elements without replacement from the n items generated in O(n lg k) time:

from heapq import nlargest

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))