How to keep fixed maximum amount of running tasks?

Solution 1:

The simplest way to do this if you have all the data available via an IEnumerable<T> is using Parallel.ForEach().

For example:

// Encapsulate the data.
public sealed class WorkItem
{
    public WorkItem(int data)
    {
        Data = data;
    }

    public int Data { get; }
}

static class Program
{
    static void Main()
    {
        const int MAX_TASKS = 4;
        var options = new ParallelOptions() { MaxDegreeOfParallelism = MAX_TASKS };
        Parallel.ForEach(workItems(), options, process);
    }

    static void process(WorkItem workItem)
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item: {workItem.Data}");
        Thread.Sleep(250); // Simulate load.
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} has processed item: {workItem.Data}");
    }

    static IEnumerable<WorkItem> workItems()
    {
        const int TOTAL_WORK_ITEMS = 100;

        for (int i = 0; i < TOTAL_WORK_ITEMS; i++)
            yield return new WorkItem(i);
    }
}

However, if the data to be processed is being created on the fly (meaning you can't use Parallel.ForEach()), it's generally best handled using something like the Dataflow library.

There's a steep learning curve for that, but I think it's worth it. However, a somewhat simpler approach is to use BlockingCollection<T> to manage a queue that cannot expand indefinitely.

(NOTE: This approach is not good if the processing task can throw an exception, because it will stop processing the work items. In this case, you'd have to use something like a CancellationToken to cancel all the tasks in the event that one of them throws.)

Try running this sample console application for an example:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class WorkItem
    {
        public WorkItem(int data)
        {
            Data = data;
        }

        public int Data { get; }
    }

    static class Program
    {
        static void Main()
        {
            const int MAX_TASKS        =   4;
            const int MAX_WORK_ITEMS   =  10;
            const int TOTAL_WORK_ITEMS = 100;

            var workItems = new BlockingCollection<WorkItem>(MAX_WORK_ITEMS);

            // Create the tasks for processing the work items.

            var tasks = new Task[MAX_TASKS];

            for (int i = 0; i < MAX_TASKS; ++i)
                tasks[i] = Task.Run(() => Process(workItems));

            // Add the work items until there are no more.

            for (int i = 0; i < TOTAL_WORK_ITEMS; ++i)
            {
                var workItem = new WorkItem(i);
                Console.WriteLine($"Adding work item {workItem.Data}");
                workItems.Add(workItem);
                Console.WriteLine($"Added work item {workItem.Data}");
            }

            // Signal that there are no more items, so that the processing tasks exit.

            Console.WriteLine("Signalling end of source data.");
            workItems.CompleteAdding();

            Console.WriteLine("Waiting for tasks to complete");
            Task.WaitAll(tasks);
            Console.WriteLine("Finished waiting for all tasks to complete.");

            Console.ReadLine();
        }

        static void Process(BlockingCollection<WorkItem> workItems)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is starting");

            foreach (var workItem in workItems.GetConsumingEnumerable())
            {
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item: {workItem.Data}");
                Thread.Sleep(250); // Simulate load.
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} has processed item: {workItem.Data}");
            }

            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is exiting");
        }
    }
}

As noted by Theodor, if the processing can throw an exception then you must handle that explicitly. For this reason, it's better to use DataFlow in that case (although of course Parallel.ForEach() is far simpler if you can use it).