I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

Consider this scenario where the tasks with * need to be processed in the order they were pushed in. The other tasks can be processed in any order.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.

So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

EDIT

As a more general problem, suppose the scenario above becomes

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 for example.

One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).


Solution 1:

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Update

To handle task groups with serial and parallel task mixes, a GroupedTaskQueue can manage a TaskQueue for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

Solution 2:

Thread pools are good for cases where the relative order of the tasks doesn't matter, provided they all get done. In particular, it must be OK for them all to be done in parallel.

If your tasks must be done in a specific order, then they are not suitable for parallelism, so a thread pool is not appropriate.

If you want to move these serial tasks off the main thread, then a single background thread with a task queue would be appropriate for those tasks. You can continue to use a thread pool for the remaining tasks which are suitable for parallelism.

Yes, it means you have to decide where to submit the task depending on whether it is an in-order task or a "may be parallelized" task, but this is not a big deal.

If you have groups that must be serialized, but which can run in parallel with other tasks then you have multiple choices:

  1. Create a single task for each group, which does the relevant group tasks in order, and post this task to the thread pool.
  2. Have each task in a group explicitly wait for the previous task in the group, and post them to the thread pool. This requires that your thread pool can handle the case where a thread is waiting for a not-yet-scheduled task without deadlocking.
  3. Have a dedicated thread for each group, and post group tasks on the appropriate message queue.