Servlet-3 Async Context, how to do asynchronous writes?

Solution 1:

I've found the Servlet 3.0 Asynchronous API tricky to implement correctly and helpful documentation to be sparse. After a lot of trial and error and trying many different approaches, I was able to find a robust solution that I've been very happy with. When I look at my code and compare it to yours, I notice one major difference that may help you with your particular problem. I use a ServletResponse to write the data and not a ServletOutputStream.

Here my go-to Asynchronous Servlet class adapted slightly for your some_big_data case:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}

Solution 2:

During my research on this topic, this thread kept popping up, so figured I mention it here:

Servlet 3.1 introduced async operations on ServletInputStream and ServletOutputStream. See ServletOutputStream.setWriteListener.

An example can be found at http://docs.oracle.com/javaee/7/tutorial/servlets013.htm

Solution 3:

this might be helpful

http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html

Solution 4:

We can't quite cause the writes to be asynchronous. We realistically have to live with the limitation that when we do write something out to a client, we expect to be able to do so promptly and are able to treat it as an error if we don't. That is, if our goal is to stream data to the client as fast as possible and use the blocking/non-blocking status of the channel as a way to control the flow, we're out of luck. But, if we're sending data at a low rate that a client should be able to handle, we are able at least to promptly disconnect clients that don't read quickly enough.

For example, in your application, we send the keepalives at a slow-ish rate (every few seconds) and expect clients to be able to keep up with all the events they're being sent. We splurge the data to the client, and if it can't keep up, we can disconnect it promptly and cleanly. That's a bit more limited than true asynchronous I/O, but it should meet your need (and incidentally, mine).

The trick is that all of the methods for writing out output which just throw IOExceptions actually do a bit more than that: in the implementation, all the calls to things that can be interrupt()ed will be wrapped with something like this (taken from Jetty 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(I also note that this doesn't happen in Jetty 8, where an InterruptedException is logged and the blocking loop is immediately retried. Presumably you make to make sure your servlet container is well-behaved to use this trick.)

That is, when a slow client causes a writing thread to block, we simply force the write to be thrown up as an IOException by calling interrupt() on the thread. Think about it: the non-blocking code would consume a unit of time on one of our processing threads to execute anyway, so using blocking writes that are just aborted (after say one millisecond) is really identical in principle. We're still just chewing up a short amount of time on the thread, only marginally less efficiently.

I've modified your code so that the main timer thread runs a job to bound the time in each write just before we start the write, and the job is cancelled if the write completes quickly, which it should.

A final note: in a well-implemented servlet container, causing the I/O to throw out ought to be safe. It would be nice if we could catch the InterruptedIOException and try the write again later. Perhaps we'd like to give slow clients a subset of the events if they can't keep up with the full stream. As far as I can tell, in Jetty this isn't entirely safe. If a write throws, the internal state of the HttpResponse object might not be consistent enough to handle re-entering the write safely later. I expect it's not wise to try to push a servlet container in this way unless there are specific docs I've missed offering this guarantee. I think the idea is that a connection is designed to be shut down if an IOException happens.

Here's the code, with a modified version of RunJob::run() using a grotty simple illustration (in reality, we'd want to use the main timer thread here rather than spin up one per-write which is silly).

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}