Parallel.ForEach can cause a "Out Of Memory" exception if working with a enumerable with a large object

I am trying to migrate a database where images were stored in the database to a record in the database pointing at a file on the hard drive. I was trying to use Parallel.ForEach to speed up the process using this method to query out the data.

However, I noticed that I was getting an OutOfMemory Exception. I know Parallel.ForEach will query a batch of enumerables to mitigate the cost of overhead if there is one for spacing the queries out (so your source will more likely have the next record cached in memory if you do a bunch of queries at once instead of spacing them out). The issue is due to one of the records that I am returning is a 1-4Mb byte array that caching is causing the entire address space to be used up (The program must run in x86 mode as the target platform will be a 32-bit machine)

Is there any way to disable the caching or make is smaller for the TPL?


Here is an example program to show the issue. This must be compiled in the x86 mode to show the issue if it is taking to long or is not happening on your machine bump up the size of the array (I found 1 << 20 takes about 30 secs on my machine and 4 << 20 was almost instantaneous)

class Program
{

    static void Main(string[] args)
    {
        Parallel.ForEach(CreateData(), (data) =>
            {
                data[0] = 1;
            });
    }

    static IEnumerable<byte[]> CreateData()
    {
        while (true)
        {
            yield return new byte[1 << 20]; //1Mb array
        }
    }
}

Solution 1:

The default options for Parallel.ForEach only work well when the task is CPU-bound and scales linearly. When the task is CPU-bound, everything works perfectly. If you have a quad-core and no other processes running, then Parallel.ForEach uses all four processors. If you have a quad-core and some other process on your computer is using one full CPU, then Parallel.ForEach uses roughly three processors.

But if the task is not CPU-bound, then Parallel.ForEach keeps starting tasks, trying hard to keep all CPUs busy. Yet no matter how many tasks are running in parallel, there is always more unused CPU horsepower and so it keeps creating tasks.

How can you tell if your task is CPU-bound? Hopefully just by inspecting it. If you are factoring prime numbers, it is obvious. But other cases are not so obvious. The empirical way to tell if your task is CPU-bound is to limit the maximum degree of parallelism with ParallelOptions.MaximumDegreeOfParallelism and observe how your program behaves. If your task is CPU-bound then you should see a pattern like this on a quad-core system:

  • ParallelOptions.MaximumDegreeOfParallelism = 1: use one full CPU or 25% CPU utilization
  • ParallelOptions.MaximumDegreeOfParallelism = 2: use two CPUs or 50% CPU utilization
  • ParallelOptions.MaximumDegreeOfParallelism = 4: use all CPUs or 100% CPU utilization

If it behaves like this then you can use the default Parallel.ForEach options and get good results. Linear CPU utilization means good task scheduling.

But if I run your sample application on my Intel i7, I get about 20% CPU utilization no matter what maximum degree of parallelism I set. Why is this? So much memory is being allocated that the garbage collector is blocking threads. The application is resource-bound and the resource is memory.

Likewise an I/O-bound task that performs long running queries against a database server will also never be able to effectively utilize all the CPU resources available on the local computer. And in cases like that the task scheduler is unable to "know when to stop" starting new tasks.

If your task is not CPU-bound or the CPU utilization doesn't scale linearly with the maximum degree of parallelism, then you should advise Parallel.ForEach not to start too many tasks at once. The simplest way is to specify a number that permits some parallelism for overlapping I/O-bound tasks, but not so much that you overwhelm the local computer's demand for resources or overtax any remote servers. Trial and error is involved to get the best results:

static void Main(string[] args)
{
    Parallel.ForEach(CreateData(),
        new ParallelOptions { MaxDegreeOfParallelism = 4 },
        (data) =>
            {
                data[0] = 1;
            });
}

Solution 2:

So, while what Rick has suggested is definitely an important point, another thing I think is missing is the discussion of partitioning.

Parallel::ForEach will use a default Partitioner<T> implementation which, for an IEnumerable<T> which has no known length, will use a chunk partitioning strategy. What this means is each worker thread which Parallel::ForEach is going to use to work on the data set will read some number of elements from the IEnumerable<T> which will then only be processed by that thread (ignoring work stealing for now). It does this to save the expense of constantly having to go back to the source and allocate some new work and schedule it for another worker thread. So, usually, this is a good thing.However, in your specific scenario, imagine you're on a quad core and you've set MaxDegreeOfParallelism to 4 threads for your work and now each of those pulls a chunk of 100 elements from your IEnumerable<T>. Well, that's 100-400 megs right there just for that particular worker thread, right?

So how do you solve this? Easy, you write a custom Partitioner<T> implementation. Now, chunking is still useful in your case, so you probably don't want to go with a single element partitioning strategy because then you would introduce overhead with all the task coordination necessary for that. Instead I would write a configurable version that you can tune via an appsetting until you find the optimal balance for your workload. The good news is that, while writing such an implementation is pretty straightfoward, you don't actually have to even write it yourself because the PFX team already did it and put it into the parallel programming samples project.

Solution 3:

This issue has everything to do with partitioners, not with the degree of parallelism. The solution is to implement a custom data partitioner.

If the dataset is large, it seems the mono implementation of the TPL is guaranteed to run out of memory.This happened to me recently (essentially I was running the above loop, and found that the memory increased linearly until it gave me an OOM exception).

After tracing the issue, I found that by default mono will divide up the enumerator using an EnumerablePartitioner class. This class has a behavior in that every time it gives data out to a task, it "chunks" the data by an ever increasing (and unchangeable) factor of 2. So the first time a task asks for data it gets a chunk of size 1, the next time of size 2*1=2, the next time 2*2=4, then 2*4=8, etc. etc. The result is that the amount of data handed to the task, and therefore stored in memory simultaneously, increases with the length of the task, and if a lot of data is being processed, an out of memory exception inevitably occurs.

Presumably, the original reason for this behavior is that it wants to avoid having each thread return multiple times to get data, but it seems to be based on the assumption that all data being processed could fit in to memory (not the case when reading from large files).

This issue can be avoided with a custom partitioner as stated previously. One generic example of one that simply returns the data to each task one item at a time is here:

https://gist.github.com/evolvedmicrobe/7997971

Simply instantiate that class first and hand it to Parallel.For instead of the enumerable itself