Java 8 Supplier Exception handling with CompletableFuture

Consider the following code

public class TestCompletableFuture {

    BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        testF.start();      
    }

    public void start() {
        Supplier<Integer> numberSupplier = new Supplier<Integer>() {
            @Override
            public Integer get() {
                return SupplyNumbers.sendNumbers();                     
            }
        };
        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);         
    }       
}

class SupplyNumbers {
    public static Integer sendNumbers(){
        return 25; // just for working sake its not  correct.
    }
}

The above thing works fine. However sendNumbers could also throw a checked exception in my case, like:

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        return 25; // just for working sake its not  correct.
    }
}

Now I want to handle this exception as y in my biConsumer. This will help me in handling the result as well as exception (if any) inside a single function (biConsumer).

Any ideas? Can I use CompletableFuture.exceptionally(fn) here or anything else?


Solution 1:

The factory methods using the standard functional interfaces aren’t helpful when you want to handle checked exceptions. When you insert code catching the exception into the lambda expression, you have the problem that the catch clause needs the CompletableFuture instance to set the exception while the factory method needs the Supplier, chicken-and-egg.

You could use an instance field of a class to allow mutation after creation, but in the end, the resulting code isn’t clean and more complicated that a straight-forward Executor-based solution. The documentation of CompletableFuture says:

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()

So you know the following code will show the standard behavior of CompletableFuture.supplyAsync(Supplier) while handling checked exceptions straight-forward:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(()-> {
  try { f.complete(SupplyNumbers.sendNumbers()); }
  catch(Exception ex) { f.completeExceptionally(ex); }
});

The documentation also says:

… To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

If you want to adhere to this convention to make the solution even more behaving like the original supplyAsync method, change the code to:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(
  (Runnable&CompletableFuture.AsynchronousCompletionTask)()-> {
    try { f.complete(SupplyNumbers.sendNumbers()); }
    catch(Exception ex) { f.completeExceptionally(ex); }
});

Solution 2:

You are already catching the exception in y. Maybe you did not see it because main exited before your CompletableFuture had a chance to complete?

The code below prints "null" and "Hello" as expected:

public static void main(String args[]) throws InterruptedException {
  TestCompletableFuture testF = new TestCompletableFuture();
  testF.start();
  Thread.sleep(1000); //wait for the CompletableFuture to complete
}

public static class TestCompletableFuture {
  BiConsumer<Integer, Throwable> biConsumer = (x, y) -> {
    System.out.println(x);
    System.out.println(y);
  };
  public void start() {
    CompletableFuture.supplyAsync(SupplyNumbers::sendNumbers)
            .whenComplete(biConsumer);
  }
}

static class SupplyNumbers {
  public static Integer sendNumbers() {
    throw new RuntimeException("Hello");
  }
}

Solution 3:

I am not quite sure what you are trying to achieve. If your supplier throws an exception, when you call testFuture .get() you will get java.util.concurrent.ExecutionException caused by any exception that was thrown by the supplier, that you can retrieve by calling getCause() on ExecutionException.

Or, just as you mentioned, you can use exceptionally in the CompletableFuture. This code:

public class TestCompletableFuture {

    private static BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) throws Exception {
        Supplier<Integer> numberSupplier = () -> {
            throw new RuntimeException(); // or return integer
        };

        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier)
                .whenComplete(biConsumer)
                .exceptionally(exception -> 7);

        System.out.println("result = " + testFuture.get());
    }

}

Prints this result:

null
java.util.concurrent.CompletionException: java.lang.RuntimeException
result = 7

EDIT:

If you have checked exceptions, you can simply add a try-catch.

Original code:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        return SupplyNumbers.sendNumbers();                     
    }
};

Modified code:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        try {
            return SupplyNumbers.sendNumbers();                     
        } catch (Excetpion e) {
            throw new RuntimeExcetpion(e);
        }
    }
};