Java: ExecutorService that blocks on submission after a certain queue size [duplicate]

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).


The currently accepted answer has a potentially significant problem - it changes the behavior of ThreadPoolExecutor.execute such that if you have a corePoolSize < maxPoolSize, the ThreadPoolExecutor logic will never add additional workers beyond the core.

From ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Specifically, that last 'else' block willl never be hit.

A better alternative is to do something similar to what OP is already doing - use a RejectedExecutionHandler to do the same put logic:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

There are some things to watch out for with this approach, as pointed out in the comments (referring to this answer):

  1. If corePoolSize==0, then there is a race condition where all threads in the pool may die before the task is visible
  2. Using an implementation that wraps the queue tasks (not applicable to ThreadPoolExecutor) will result in issues unless the handler also wraps it the same way.

Keeping those gotchas in mind, this solution will work for most typical ThreadPoolExecutors, and will properly handle the case where corePoolSize < maxPoolSize.


Here is how I solved this on my end:

(note: this solution does block the thread that submits the Callable, so it prevents RejectedExecutionException from being thrown )

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

How about using the CallerBlocksPolicy class if you are using spring-integration?

This class implements the RejectedExecutionHandler interface, which is a handler for tasks that cannot be executed by a ThreadPoolExecutor.

You can use this policy like this.

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

The main difference between CallerBlocksPolicy and CallerRunsPolicy is whether it blocks or runs the task in the caller thread.

Please refer to this code.