How do I implement task prioritization using an ExecutorService in Java 5?

I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callable or a Runnable is not important to me).

Now, superficially it looks like I could use a PriorityBlockingQueue as the task queue in my ThreadPoolExecutor, but that queue contains Runnable objects, which may or may not be the Runnable tasks I've submitted to it. Moreover, if I've submitted Callable tasks, it's not clear how this would ever map.

Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.

(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)


I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.

Using a PriorityBlockingQueue as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue must be generically instantiated to contain Runnable instances, and it is impossible to call compareTo (or similiar) on a Runnable interface.

Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Now, a peek at CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor as so:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Where c is the Callable task that we're trying to execute. Now, let's have a peek at CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Notice the getTask method. We're gonna use that later to grab the original task out of this CustomFutureTask that we've created.

And finally, let's modify the original task that we were trying to execute:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

You can see that we implement Comparable in the task to delegate to the actual Comparator for MyType.

And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!


At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.


You can use these helper classes:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

AND this helper method:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

AND then use it like this:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}