How is the fork/join framework better than a thread pool?

What are the benefits of using the new fork/join framework over just simply splitting the big task into N subtasks in the beginning, sending them to a cached thread pool (from Executors) and waiting for each task to complete? I fail to see how using the fork/join abstraction simplifies the problem or makes the solution more efficient from what we've had for years now.

For example, the parallelized blurring algorithm in the tutorial example could be implemented like this:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

Split in the beginning and send tasks to a thread pool:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

The tasks go to the thread pool's queue, from which they're executed as worker threads become available. As long as the splitting is granular enough (to avoid having to particularly wait for the last task) and the thread pool has enough (at least N of processors) threads, all processors are working at full speed until the whole computation is done.

Am I missing something? What's the added value of using the fork/join framework?


Solution 1:

I think the basic misunderstanding is, that the Fork/Join examples do NOT show work stealing but only some kind of standard divide and conquer.

Work stealing would be like this: Worker B has finished his work. He is a kind one, so he looks around and sees Worker A still working very hard. He strolls over and asks: "Hey lad, I could give you a hand." A replies. "Cool, I have this task of 1000 units. So far I have finished 345 leaving 655. Could you please work on number 673 to 1000, I'll do the 346 to 672." B says "OK, let's start so we can go to the pub earlier."

You see - the workers must communicate between each other even when they started the real work. This is the missing part in the examples.

The examples on the other hand show only something like "use subcontractors":

Worker A: "Dang, I have 1000 units of work. Too much for me. I'll do 500 myself and subcontract 500 to someone else." This goes on until the big task is broken down into small packets of 10 units each. These will be executed by the available workers. But if one packet is a kind of poison pill and takes considerably longer than other packets -- bad luck, the divide phase is over.

The only remaining difference between Fork/Join and splitting the task upfront is this: When splitting upfront you have the work queue full right from start. Example: 1000 units, the threshold is 10, so the queue has 100 entries. These packets are distributed to the threadpool members.

Fork/Join is more complex and tries to keep the number of packets in the queue smaller:

  • Step 1: Put one packet containing (1...1000) into queue
  • Step 2: One worker pops the packet(1...1000) and replaces it with two packets: (1...500) and (501...1000).
  • Step 3: One worker pops packet (500...1000) and pushes (500...750) and (751...1000).
  • Step n: The stack contains these packets: (1..500), (500...750), (750...875)... (991..1000)
  • Step n+1: Packet (991..1000) is popped and executed
  • Step n+2: Packet (981..990) is popped and executed
  • Step n+3: Packet (961..980) is popped and split into (961...970) and (971..980). ....

You see: in Fork/Join the queue is smaller (6 in the example) and the "split" and "work" phases are interleaved.

When multiple workers are popping and pushing simultaneously the interactions are not so clear of course.

Solution 2:

If you have n busy threads all working away at 100% independently, that's going to be better than n threads in a Fork-Join (FJ) pool. But it never works out that way.

There might not be able to precisely split the problem into n equal pieces. Even if you do, thread scheduling is some way off being fair. You'll end up waiting for the slowest thread. If you have multiple task then they can each run with less than n-way parallelism (generally more efficient), yet go up to n-way when other tasks have finished.

So why don't we just cut the problem up into FJ-size pieces and have a thread pool work on that. Typical FJ usage cuts the problem into tiny pieces. Doing these in a random order requires much co-ordination at a hardware level. The overheads would be a killer. In FJ, tasks are put onto a queue that the thread reads off in Last In First Out order (LIFO/stack), and work stealing (in core work, generally) is done First In First Out (FIFO/"queue"). The result is that long array processing can be done largely sequentially, even though it is broken into tiny chunks. (It is also the case that it might not be trivial to break the problem up into small evenly sized chunks in one big bang. Say dealing with a some form of hierarchy without balancing.)

Conclusion: FJ allows more efficient use of hardware threads in an uneven situations, which will be always if you have more than one thread.

Solution 3:

The ultimate goal of thread pools and Fork/Join are alike: Both want to utilize the available CPU power the best they can for maximum throughput. Maximum throughput means that as many tasks as possible should be completed in a long period of time. What is needed to do that? (For the following we will assume that there is no shortage of calculation tasks: There is always enough to do for 100% CPU utilisation. Additionally I use "CPU" equivalently for cores or virtual cores in case of hyper-threading).

  1. At least there need to be as many threads running as there are CPUs available, because running less threads will leave a core unused.
  2. At maximum there must be as many threads running as there are CPUs available, because running more threads will create additional load for the Scheduler who assigns CPUs to the different threads which causes some CPU time to go to the scheduler rather than our computational task.

Thus we figured out that for maximum throughput we need to have the exact same number of threads than CPUs. In Oracle's blurring example you can both take a fixed size thread pool with the number of threads equal to the number of available CPUs or use a thread pool. It won't make a difference, you are right!

So when will you get into trouble with a thread pools? That is if a thread blocks, because your thread is waiting for another task to complete. Assume the following example:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

What we see here is an algorithm that consists of three steps A, B and C. A and B can be performed independently of each other, but step C needs the result of step A AND B. What this algorithm does is submit task A to the threadpool and perform task b directly. After that the thread will wait for task A to be done as well and continue with step C. If A and B are completed at the same time, then everything is fine. But what if A takes longer than B? That may be because the nature of task A dictates it, but it may also be the case because there is not thread for task A available in the beginning and task A needs to wait. (If there is only a single CPU available and thus your threadpool has only a single thread this will even cause a deadlock, but for now that is besides the point). The point is that the thread that just executed task B blocks the whole thread. Since we have the same number of threads as CPUs and one thread is blocked that means that one CPU is idle.

Fork/Join solves this problem: In the fork/join framework you'd write the same algorithm as follows:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

Looks the same, does it not? However the clue is that aTask.join will not block. Instead here is where work-stealing comes into play: The thread will look around for other tasks that have been forked in the past and will continue with those. First it checks whether the tasks it has forked itself have started processing. So if A has not been started by another thread yet, it will do A next, otherwise it will check the queue of other threads and steal their work. Once this other task of another thread has completed it will check whether A is completed now. If it is the above algorithm can call stepC. Otherwise it will look for yet another task to steal. Thus fork/join pools can achieve 100% CPU utilisation, even in the face of blocking actions.

However there is a trap: Work-stealing is only possible for the join call of ForkJoinTasks. It cannot be done for external blocking actions like waiting for another thread or waiting for an I/O action. So what about that, waiting for I/O to complete is a common task? In this case if we could add an additional thread to Fork/Join pool that will be stopped again as soon as the blocking action has completed will be the second best thing to do. And the ForkJoinPool can actually do just that if we are using ManagedBlockers.

Fibonacci

In the JavaDoc for RecursiveTask is an example for calculating Fibonacci numbers using Fork/Join. For a classic recursive solution see:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

As is explained int the JavaDocs this is a pretty dump way to calculate fibonacci numbers, as this algorithm has O(2^n) complexity while simpler ways are possible. However this algorithm is very simple and easy to understand, so we stick with it. Let's assume we want to speed this up with Fork/Join. A naive implementation would look like this:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

The steps that this Task is split into are way too short and thus this will perform horribly, but you can see how the framework generally works very well: The two summands can be calculated independently, but then we need both of them to build the final result. So one half is done in an other thread. Have fun doing the same with thread pools without getting a deadlock (possible, but not nearly as simple).

Just for completeness: If you'd actually want to calculate Fibonacci numbers using this recursive approach here is an optimized version:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

This keeps the subtasks much smaller because they are only split when n > 10 && getSurplusQueuedTaskCount() < 2 is true, which means that there are significantly more than 100 method calls to do (n > 10) and there are not very man tasks already waiting (getSurplusQueuedTaskCount() < 2).

On my computer (4 core (8 when counting Hyper-threading), Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz) the fib(50) takes 64 seconds with the classic approach and just 18 seconds with the Fork/Join approach which is quite a noticeable gain, although not as much as theoretically possible.

Summary

  • Yes, in your example Fork/Join has no advantage over classic thread pools.
  • Fork/Join can drastically improve performance when blocking is involved
  • Fork/Join circumvents some deadlock problems