一、CompletableFuture基本功能

CompletableFuture是JDK1.8版本中的新特性,主要是对JDK5中的Future的补充,弥补了Future的局限性,实现了FutureCompletionStage两个接口。同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,推荐使用自定义线程池

二、CompletableFuture使用介绍

在以下的例子中会使用到 SmallTool 这个工具类,它有两个静态方法:一个是睡眠指定毫秒;一个是打印当前时间和线程信息。

public class SmallTool {
    
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

}

创建异步任务

CompletableFuture 创建异步任务的方法有 2 类共 4 种:

// supplyAsync 方法有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

// runAsync 方法没有返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync

supplyAsync用来创建带有返回值的异步任务,它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    

    Java

  • 指定自定义线程池的重载方法

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    

例子:

使用默认线程池:

public static void main(String[] args) {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "线程的返回值";
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1691999870367   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1691999870391   |   1   |   main    |   执行结果:线程的返回值
 */

使用自定义线程池:

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "线程的返回值";
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692346286341   |   12  |   pool-1-thread-1 |   在线程中做的一些事情...
1692346286364   |   1   |   main    |   执行结果:线程的返回值
 */

runAsync

runAsync用来创建没有返回值的异步任务,它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public static void main(String[] args) {
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            SmallTool.printTimeAndThread("在线程中做的一些事情...");
        });
    
        SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
    }
    /* 输出:
    1692346581755 |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
    1692346581777 |   1   |   main    |   执行结果:null
     */
    

  • 指定自定义线程池的重载方法

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            SmallTool.printTimeAndThread("在线程中做的一些事情...");
        }, executorService);
    
        SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
    }
    /* 输出:
    1692346689126 |   12  |   pool-1-thread-1 |   在线程中做的一些事情...
    1692346689153 |   1   |   main    |   执行结果:null
     */
    

获取执行结果

获取执行结果有get()join方法,它们都是阻塞式获取。

get

get() 方法是阻塞的,调用它会等待CompletableFuture 的计算完成,并返回计算的结果。如果计算还未完成,调用该方法会导致当前线程阻塞。如果计算过程中发生异常,异常会被包装为ExecutionException 并在调用get() 方法时抛出。

public T get() throws InterruptedException, ExecutionException

join

join() 方法与get() 方法类似,但不会抛出受检异常。如果计算过程中发生异常,异常会被包装为CompletionException 并在调用join() 方法时抛出。由于不抛出受检异常,join() 在某些情况下更加方便。

public T join()

串联异步操作

把不同的任务前后连接起来。

thenCompose / thenComposeAsync

thenCompose() 用于将两个异步操作串联起来,它的作用是在一个操作完成后,将其结果传递给另一个操作,并返回一个新的CompletableFuture 对象,表示串联后的异步操作。

具体来说,thenCompose() 方法用于处理异步操作的"链式调用",其中第一个操作的结果会作为参数传递给第二个操作,从而实现操作的顺序执行。这种组合方式可以避免嵌套的回调地狱,使异步操作的代码更加清晰和易于理解。

public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn)

例子:

public static void main(String[] args) {
    
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenCompose(result -> {
        SmallTool.printTimeAndThread(String.format("result = %s", result));
        return CompletableFuture.supplyAsync(() -> result + ",结果2");
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692347615316   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692347615340   |   12  |   ForkJoinPool.commonPool-worker-1    |   result = 结果1
1692347615341   |   1   |   main    |   执行结果:结果1,结果2
 */

thenComposeAsync()thenCompose() 类似,用于将两个异步操作串联起来。不同的是thenComposeAsync()方法会在指定的执行器(线程池)中执行操作,从而实现异步操作的并行执行。

具体来说,thenComposeAsync() 方法的作用是将第一个操作的结果传递给第二个操作,并返回一个新的CompletableFuture 对象,表示串联后的异步操作。不同之处在于,第二个操作会在指定的执行器(线程池)中执行,而不会阻塞当前线程。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U> CompletableFuture<U> thenComposeAsync(
            Function<? super T, ? extends CompletionStage<U>> fn)
    

  • 指定自定义线程池的重载方法

    public <U> CompletableFuture<U> thenComposeAsync(
            Function<? super T, ? extends CompletionStage<U>> fn,
            Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenComposeAsync(result -> {
        SmallTool.sleepMillis(1000);
        SmallTool.printTimeAndThread(String.format("result = %s", result));
        return CompletableFuture.supplyAsync(() -> result + ",结果2");
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692350376975   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692350378021   |   13  |   pool-1-thread-1 |   result = 结果1
1692350378022   |   1   |   main    |   执行结果:结果1,结果2
 */

组合异步操作

把不同的任务前后组合起来。

thenCombine / thenCombineAsync

thenCombine() 用于组合两个独立的异步操作的结果,然后将这两个结果传递给一个函数进行处理,并返回一个新的CompletableFuture 对象,表示组合后的异步操作。

具体来说,thenCombine() 方法的作用是将两个独立的异步操作的结果合并,然后使用一个函数对这两个结果进行处理。这个函数可以是一个BiFunction,它接受两个参数,分别是两个操作的结果,然后返回一个新的结果。返回的结果会作为新的CompletableFuture 的计算结果。

public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

例子:

public static void main(String[] args) {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), (result1, result2) -> {
        SmallTool.printTimeAndThread("combine...");
        return result1 + " " + result2;
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692348618510   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692348618510   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692348618510   |   13  |   ForkJoinPool.commonPool-worker-2    |   combine...
1692348618534   |   1   |   main    |   执行结果:结果1 结果2
 */

thenCombineAsync()类似于thenCombine(),用于将两个独立的异步操作的结果合并,并在指定的执行器(线程池)中执行合并操作。

具体来说,thenCombineAsync() 方法的作用是将两个独立的异步操作的结果合并,然后通过一个函数对这两个结果进行处理。这个函数可以是一个BiFunction,它接受两个参数,分别是两个操作的结果,然后返回一个新的结果。返回的结果会作为新的CompletableFuture 的计算结果。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U,V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T,? super U,? extends V> fn)
    

  • 指定自定义线程池的重载方法

    public <U,V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.sleepMillis(1000);
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), (result1, result2) -> {
        SmallTool.printTimeAndThread("combine...");
        return result1 + " " + result2;
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692350429739   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692350430739   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692350430740   |   14  |   pool-1-thread-1 |   combine...
1692350430770   |   1   |   main    |   执行结果:结果1 结果2
 */

thenAcceptBoth / thenAcceptBothAsync

thenAcceptBoth 方法的作用是在两个CompletableFuture 都完成时,对它们的结果进行操作。与此同时,它不返回任何结果

具体来说,thenAcceptBoth 方法接受两个参数:另一个CompletableFuture 对象和一个消费者(Consumer)函数。这个消费者函数会在两个CompletableFuture 都完成时被调用,传入这两个任务的结果作为参数。

public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)

例子:

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), (result1, result2) -> {
        SmallTool.printTimeAndThread(String.format("处理:%s和%s,没有返回值", result1, result2));
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582882023   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582882023   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692582882042   |   13  |   ForkJoinPool.commonPool-worker-2    |   处理:结果1和结果2,没有返回值
1692582882043   |   1   |   main    |   执行结果:null
 */

thenAcceptBothAsync 方法的作用与thenAcceptBoth 类似,但是允许你在两个CompletableFuture 都完成时,以异步的方式执行一个操作,不会阻塞当前线程。

具体来说,thenAcceptBothAsync 方法接受两个参数:另一个CompletableFuture 对象和一个消费者(Consumer)函数。这个消费者函数会在两个CompletableFuture 都完成时以异步的方式被调用,传入这两个任务的结果作为参数。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) 
    

  • 指定自定义线程池的重载方法

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action, Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.sleepMillis(1000);
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), (result1, result2) -> {
        SmallTool.printTimeAndThread(String.format("异步处理:%s和%s,没有返回值", result1, result2));
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582845466   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582846466   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692582846496   |   14  |   pool-1-thread-1 |   异步处理:结果1和结果2,没有返回值
1692582846497   |   1   |   main    |   执行结果:null
 */

runAfterBoth / runAfterBothAsync

runAfterBoth 方法的作用是在两个CompletableFuture 都完成时,执行一个操作,但不关心任务的结果,也不返回任何结果。它类似于执行一个后续的清理或结束操作。

具体来说,runAfterBoth 方法接受两个参数:另一个CompletableFuture 对象和一个Runnable 接口的实例。这个Runnable 对象会在两个CompletableFuture 都完成时被调用,执行其中的操作。

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action)

例子:

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), () -> {
        SmallTool.printTimeAndThread("处理结果,既不接收参数,也没有返回值");
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692583102562   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692583102562   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692583102562   |   13  |   ForkJoinPool.commonPool-worker-2    |   处理结果,既不接收参数,也没有返回值
1692583102593   |   1   |   main    |   执行结果:null
 */

runAfterBothAsync方法的作用与runAfterBoth 类似,但是允许你在两个CompletableFuture 都完成时,以异步的方式执行一个操作,不会阻塞当前线程。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                         Runnable action) 
    

  • 指定自定义线程池的重载方法

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                         Runnable action,
                                                         Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        return "结果2";
    }), () -> {
        SmallTool.printTimeAndThread("异步处理结果,既不接收参数,也没有返回值");
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692583224578   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692583224578   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692583224579   |   14  |   pool-1-thread-1 |   异步处理结果,既不接收参数,也没有返回值
1692583224598   |   1   |   main    |   执行结果:null
 */

处理结果

处理前一个任务的执行情况。

thenApply / thenApplyAsync

thenApply()用于在异步操作完成后对其结果进行处理,并返回一个新的CompletableFuture 对象,表示处理后的结果。

具体来说,thenApply() 方法的作用是在当前异步操作完成后,应用一个函数(Function)来处理操作的结果,并将处理后的结果作为新的CompletableFuture 的计算结果。

相当于 Stream Api 中的map操作。

public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn)

例子:

public static void main(String[] args) {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenApply(result -> {
        SmallTool.printTimeAndThread("处理前一个任务的执行结果");
        return "处理结果1";
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692349583570   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692349583571   |   12  |   ForkJoinPool.commonPool-worker-1    |   处理前一个任务的执行结果
1692349583594   |   1   |   main    |   执行结果:处理结果1
 */

thenApplyAsync()thenApply() 类似,用于在异步操作完成后对其结果进行处理。然而,thenApplyAsync() 具有并行执行操作的能力,可以在指定的执行器中执行处理操作,从而实现异步操作的并行处理。

具体来说,thenApplyAsync() 方法的作用是在当前异步操作完成后,使用一个函数(Function)来处理操作的结果,并在指定的执行器中执行处理操作,然后返回一个新的CompletableFuture 对象,表示处理后的结果。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T,? extends U> fn) 
    

  • 指定自定义线程池的重载方法

    public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T,? extends U> fn, Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenApplyAsync(result -> {
        SmallTool.sleepMillis(1000);
        SmallTool.printTimeAndThread("处理前一个任务的执行结果");
        return "处理结果1";
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692350475858   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692350476861   |   13  |   pool-1-thread-1 |   处理前一个任务的执行结果
1692350476899   |   1   |   main    |   执行结果:处理结果1
 */

thenAccept / thenAcceptAsync

thenAccept作用是在一个CompletableFuture 完成时,对其结果进行处理,但不返回任何结果。换句话说,它允许你在异步操作完成后执行一个处理结果的操作。

具体来说,thenAccept 方法接受一个参数,该参数是一个消费者(Consumer)函数。这个函数会在前一个CompletableFuture 完成时被调用,并传入前一个CompletableFuture 的结果作为参数,从而让你能够对结果进行处理。

thenAccept 方法适用于那些你只需要处理异步任务结果而不需要返回值的情况,它可以让你在异步任务完成时执行一些操作,如日志记录、结果处理等。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

例子:

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenAccept(result -> {
        SmallTool.printTimeAndThread("处理前一个任务的执行结果,没有返回值:" + result);
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582199690   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582199691   |   12  |   ForkJoinPool.commonPool-worker-1    |   处理前一个任务的执行结果,没有返回值:结果1
1692582199716   |   1   |   main    |   执行结果:null
 */

thenAcceptAsync 方法的作用与thenAccept 类似,但是允许你在异步任务完成时以异步的方式执行一个处理结果的操作。

具体来说,thenAcceptAsync 方法会将传入的消费者函数(Consumer)以异步的方式执行,而不会阻塞当前线程。这样可以更好地利用线程资源,特别是在处理大量异步任务时。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    

  • 指定自定义线程池的重载方法

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                       Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenAcceptAsync(result -> {
        SmallTool.sleepMillis(1000);
        SmallTool.printTimeAndThread("异步处理前一个任务的执行结果,没有返回值:" + result);
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582152269   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582153274   |   13  |   pool-1-thread-1 |   异步处理前一个任务的执行结果,没有返回值:结果1
1692582153299   |   1   |   main    |   执行结果:null
 */

thenRun / thenRunAsync

thenRun 方法的作用是在一个CompletableFuture 完成时执行一个操作,不关心前一个任务的结果,并且不返回任何结果。换句话说,它用于在异步任务完成后执行一个操作,而这个操作不需要处理结果。

具体来说,thenRun 方法接受一个参数,该参数是一个 Runnable 接口的实例,表示要执行的操作。这个操作会在前一个CompletableFuture 完成时被调用。

thenRun 方法适用于那些你只需要在异步任务完成后执行一些操作,而不关心任务结果的情况。这可以用于清理资源、记录日志等操作。

public CompletableFuture<Void> thenRun(Runnable action)

例子:

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenRun(() -> {
        SmallTool.printTimeAndThread("处理前一个任务的执行结果,既没有参数,也没有返回值");
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582241672   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582241672   |   12  |   ForkJoinPool.commonPool-worker-1    |   处理前一个任务的执行结果,既没有参数,也没有返回值
1692582241691   |   1   |   main    |   执行结果:null
 */

thenRunAsync 方法的作用与thenRun 类似,但是允许你以异步的方式执行一个操作,不会阻塞当前线程。

具体来说,thenRunAsync 方法接受一个参数,该参数是一个Runnable 接口的实例,表示要执行的操作。这个操作会在前一个CompletableFuture 完成时以异步的方式被调用,因此不会阻塞当前线程。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<Void> thenRunAsync(Runnable action)
    

  • 指定自定义线程池的重载方法

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                    Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        return "结果1";
    }).thenRunAsync(() -> {
        SmallTool.printTimeAndThread("处理前一个任务的执行结果,既没有参数,也没有返回值");
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692582366668   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692582366668   |   13  |   pool-1-thread-1 |   处理前一个任务的执行结果,既没有参数,也没有返回值
1692582366689   |   1   |   main    |   执行结果:null
 */

多个任务获取最先完成

对首先完成的任务单独处理。

applyToEither / applyToEitherAsync

applyToEither()用于在多个异步操作中,只要有一个操作完成就对其结果进行处理。它允许你将一个函数应用于第一个完成的操作的结果,并返回一个新的CompletableFuture 对象,表示处理后的结果。

具体来说,applyToEither() 方法的作用是在多个异步操作中,等待任意一个操作完成,然后将其结果应用于一个函数,得到一个新的计算结果。

public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn)

例子:

public static void main(String[] args) {
    
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), (result) -> {
        SmallTool.printTimeAndThread("处理首先完成的任务");
        return result + "首先完成";
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692350912821   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692350912821   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692350913831   |   13  |   ForkJoinPool.commonPool-worker-2    |   处理首先完成的任务
1692350913865   |   1   |   main    |   执行结果:结果2首先完成
 */

applyToEitherAsync()类似于applyToEither(),但允许在指定的执行器中异步地执行操作。与applyToEither() 不同的是,applyToEitherAsync() 方法可以在多个异步操作中,等待任意一个操作完成后,将其结果应用于一个函数,并在指定的执行器中执行这个处理操作,然后返回一个新的CompletableFuture 对象,表示处理后的结果。

具体来说,applyToEitherAsync() 方法的作用是在多个异步操作中,等待任意一个操作完成,然后将其结果应用于一个函数,并在指定的执行器中执行这个处理操作,得到一个新的计算结果。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn)
    

  • 指定自定义线程池的重载方法

    public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn,
            Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), (result) -> {
        SmallTool.printTimeAndThread("处理首先完成的任务");
        return result + "首先完成";
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692351114832   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692351114832   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692351115838   |   14  |   pool-1-thread-1 |   处理首先完成的任务
1692351115861   |   1   |   main    |   执行结果:结果2首先完成
 */

acceptEither / acceptEitherAsync

acceptEither 方法用于指定一个回调函数(Consumer),该函数会在两个 CompletableFuture 中的任意一个完成时被调用。这个方法并不关心哪个 CompletableFuture 先完成,只要有一个完成,就会调用你提供的回调函数。回调函数接受完成的结果作为参数,但不返回任何结果。这个方法适用于你只关心结果的消费操作,例如日志记录或通知等。

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).acceptEither(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), (result) -> {
        SmallTool.printTimeAndThread(String.format("处理首先完成的任务:%s,没有返回值", result));
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692583971736   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692583971736   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692583972785   |   13  |   ForkJoinPool.commonPool-worker-2    |   处理首先完成的任务:结果2,没有返回值
1692583972785   |   1   |   main    |   执行结果:null
 */

acceptEitherAsyncacceptEither 方法类似,但具有异步执行的特性。

具体来说,acceptEitherAsync 方法会在两个 CompletableFuture 中的任意一个完成时,将提供的回调函数提交给默认的ForkJoinPool 或指定的Executor 进行异步执行。这可以提供更好的并发性能,尤其在大规模的异步任务处理中。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action)
    

  • 指定自定义线程池的重载方法

    public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action,
            Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).acceptEitherAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), (result) -> {
        SmallTool.printTimeAndThread(String.format("处理首先完成的任务:%s,没有返回值", result));
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692584162667   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692584162667   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692584163692   |   14  |   pool-1-thread-1 |   处理首先完成的任务:结果2,没有返回值
1692584163692   |   1   |   main    |   执行结果:null
 */

runAfterEither / runAfterEitherAsync

runAfterEither 用于在两个 CompletableFuture 中的任意一个完成时执行一个无返回值的动作(Runnable)。它允许你定义一个动作,该动作会在其中一个 CompletableFuture 完成时被触发,无论哪个 CompletableFuture 先完成。

具体来说,runAfterEither 方法会在其中一个 CompletableFuture 完成时,触发指定的动作(Runnable)。这个方法不关心任务完成的结果,也不返回结果。这在你需要在两个异步任务中的任意一个完成后执行某个操作时很有用。

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action)

例子:

public static void main(String[] args) {

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).runAfterEither(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), () -> {
        SmallTool.printTimeAndThread("处理首先完成的任务,既不接收参数,也没有返回值");
    });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692584517959   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692584517959   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692584518961   |   13  |   ForkJoinPool.commonPool-worker-2    |   处理首先完成的任务,既不接收参数,也没有返回值
1692584519007   |   1   |   main    |   执行结果:null
 */

runAfterEitherAsync用于在两个 CompletableFuture 中的任意一个完成后,异步执行一个动作(Runnable)。这个方法类似于runAfterEither,既不关心任务完成的结果,也不返回结果,但它会将指定的动作提交给默认的ForkJoinPool 或指定的Executor 进行异步执行。

具体来说,runAfterEitherAsync 方法会在其中一个 CompletableFuture 完成后,将指定的动作异步执行,无论哪一个 CompletableFuture 先完成。与runAfterEither 不同,runAfterEitherAsync 提供了异步执行的特性,可以更好地处理大规模的异步任务。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                           Runnable action)
    

    Java

  • 指定自定义线程池的重载方法

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                           Runnable action,
                                                           Executor executor) 
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的一些事情...");
        SmallTool.sleepMillis(2000);
        return "结果1";
    }).runAfterEitherAsync(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
        SmallTool.sleepMillis(1000);
        return "结果2";
    }), () -> {
        SmallTool.printTimeAndThread("异步处理首先完成的任务,既不接收参数,也没有返回值");
    }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692584839813   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692584839813   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692584840826   |   14  |   pool-1-thread-1 |   异步处理首先完成的任务,既不接收参数,也没有返回值
1692584840845   |   1   |   main    |   执行结果:null
 *

处理异常

处理任务执行中发生的异常情况。

exceptionally

exceptionally()用于处理异步操作中发生的异常情况。它允许你注册一个回调函数,当异步操作抛出异常时,该回调函数会被调用,并提供一个默认值或另一个计算结果作为处理。

具体来说,exceptionally() 方法的作用是为异步操作设置一个异常处理器,以便在操作出现异常时执行特定的处理逻辑,而不是让异常传播到上层调用。

exceptionally()可以在链路中的结尾加,也能在链路中的中间加。

public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn)

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的一些事情...");
                throw new RuntimeException("啊,我发生异常了");
            }, executorService)
            .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
                SmallTool.sleepMillis(1000);
                return "另外一些事情的执行结果";
            }), (result) -> {
                SmallTool.printTimeAndThread("处理首先完成的任务");
                return result + "首先完成";
            })
            .exceptionally(throwable -> {
                SmallTool.printTimeAndThread("处理异常");
                return "发生了异常:" + throwable.getMessage();
            });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692585253214   |   12  |   pool-1-thread-1 |   在线程中做的一些事情...
1692585253214   |   13  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的另外一些事情...
1692585253215   |   1   |   main    |   处理异常
1692585253234   |   1   |   main    |   执行结果:发生了异常:java.lang.RuntimeException: 啊,我发生异常了
 */

handle / handleAsync

handle 用于在异步任务完成后执行一个处理函数,这个函数可以处理任务的结果或异常handle 方法允许你在任务完成后进行一些后续处理,无论任务是否成功完成。

具体来说,handle 方法接收一个BiFunction 参数,该参数会在任务完成后被调用。如果任务正常完成,处理函数会接收任务的结果作为第一个参数,而如果任务发生异常,处理函数会接收异常作为第一个参数,你可以在处理函数中进行相应的处理,然后返回一个新的结果或异常。

public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn)

例子:

public static void main(String[] args) {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的一些事情...");
                throw new RuntimeException("啊,我发生异常了");
            })
            .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
                SmallTool.sleepMillis(1000);
                return "另外一些事情的执行结果";
            }), (result) -> {
                SmallTool.printTimeAndThread("处理首先完成的任务");
                return result + "首先完成";
            })
            .handle((result, throwable) -> {
                if (throwable != null) {
                    SmallTool.printTimeAndThread("处理异常");
                    return "发生了异常:" + throwable.getMessage();
                } else {
                    SmallTool.printTimeAndThread("处理任务结果");
                    return "任务结果:" + result;
                }
            });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692585214727   |   12  |   pool-1-thread-1 |   在线程中做的一些事情...
1692585214728   |   13  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的另外一些事情...
1692585214728   |   1   |   main    |   处理异常
1692585214762   |   1   |   main    |   执行结果:发生了异常:java.lang.RuntimeException: 啊,我发生异常了
 */

handleAsync用于在异步任务完成后异步执行一个处理函数,类似于handle 方法,但handleAsync 具有异步执行的特性。

具体来说,handleAsync 方法接收一个BiFunction 参数,该参数会在任务完成后被调用。如果任务正常完成,处理函数会接收任务的结果作为第一个参数,而如果任务发生异常,处理函数会接收异常作为第一个参数。与handle 不同的是,handleAsync 方法会将处理函数提交给默认的ForkJoinPool 或指定的Executor 进行异步执行。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn)
    

  • 指定自定义线程池的重载方法

    public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的一些事情...");
                throw new RuntimeException("啊,我发生异常了");
            })
            .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
                SmallTool.sleepMillis(1000);
                return "另外一些事情的执行结果";
            }), (result) -> {
                SmallTool.printTimeAndThread("处理首先完成的任务");
                return result + "首先完成";
            })
            .handleAsync((result, throwable) -> {
                if (throwable != null) {
                    SmallTool.printTimeAndThread("异步处理异常");
                    return "发生了异常:" + throwable.getMessage();
                } else {
                    SmallTool.printTimeAndThread("异步处理任务结果");
                    return "任务结果:" + result;
                }
            }, executorService);

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692585547974   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692585547974   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692585547975   |   15  |   pool-1-thread-1 |   异步处理异常
1692585548001   |   1   |   main    |   执行结果:发生了异常:java.lang.RuntimeException: 啊,我发生异常了
 */

whenComplete / whenCompleteAsync

whenComplete用于在异步任务完成后执行一个动作(Consumer)。不同于handle 方法,whenComplete 方法无论任务成功完成还是发生异常,都会执行指定的动作。

具体来说,whenComplete 方法接收一个BiConsumer 参数,该参数会在任务完成后被调用。如果任务正常完成,处理函数会接收任务的结果作为第一个参数,而如果任务发生异常,处理函数会接收异常作为第一个参数。无论是哪种情况,你都可以在处理函数中进行相应的操作。

注意,当出现异常时whenComplete虽然可以处理异常,但是出现异常时就也会抛出异常,需要你手动处理该异常,比如在whenComplete后链式调用exceptionally方法。

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action)

例子:

public static void main(String[] args) {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的一些事情...");
                throw new RuntimeException("啊,我发生异常了");
            })
            .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
                SmallTool.sleepMillis(1000);
                return "另外一些事情的执行结果";
            }), (result) -> {
                SmallTool.printTimeAndThread("处理首先完成的任务");
                return result + "首先完成";
            })
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    SmallTool.printTimeAndThread(String.format("处理异常:%s,没有返回值", throwable.getMessage()));
                } else {
                    SmallTool.printTimeAndThread(String.format("处理结果:%s,没有返回值", result));
                }
            });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692585857210   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692585857210   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692585857228   |   12  |   ForkJoinPool.commonPool-worker-1    |   处理异常:java.lang.RuntimeException: 啊,我发生异常了,没有返回值
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: 啊,我发生异常了
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.RuntimeException: 啊,我发生异常了
    at App.lambda$main$0(App.java:18)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    ... 5 more
 */

whenCompleteAsync用于在异步任务完成后异步执行一个动作(Consumer)。类似于whenComplete 方法,但whenCompleteAsync 具有异步执行的特性。

具体来说,whenCompleteAsync 方法接收一个BiConsumer 参数,该参数会在任务完成后被调用。如果任务正常完成,处理函数会接收任务的结果作为第一个参数,而如果任务发生异常,处理函数会接收异常作为第一个参数。与whenComplete 不同的是,whenCompleteAsync 方法会将处理函数提交给默认的ForkJoinPool 或指定的Executor 进行异步执行。

它有如下两个方法:

  • 使用默认线程池(ForkJoinPool.commonPool)的方法

    public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action)
    

    Java

  • 指定自定义线程池的重载方法

    public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action, Executor executor)
    

例子:

public static void main(String[] args) {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的一些事情...");
                throw new RuntimeException("啊,我发生异常了");
            })
            .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("在线程中做的另外一些事情...");
                SmallTool.sleepMillis(1000);
                return "另外一些事情的执行结果";
            }), (result) -> {
                SmallTool.printTimeAndThread("处理首先完成的任务");
                return result + "首先完成";
            })
            .whenCompleteAsync((result, throwable) -> {
                if (throwable != null) {
                    SmallTool.printTimeAndThread(String.format("处理异常:%s,没有返回值", throwable.getMessage()));
                } else {
                    SmallTool.printTimeAndThread(String.format("处理结果:%s,没有返回值", result));
                }
            }, executorService)
            .exceptionally(throwable -> {
                SmallTool.printTimeAndThread("exceptionally 中处理异常");
                return "exceptionally 处理异常:" + throwable.getMessage();
            });

    SmallTool.printTimeAndThread(String.format("执行结果:%s", cf.join()));
}
/* 输出:
1692586364355   |   13  |   ForkJoinPool.commonPool-worker-2    |   在线程中做的另外一些事情...
1692586364355   |   12  |   ForkJoinPool.commonPool-worker-1    |   在线程中做的一些事情...
1692586364374   |   15  |   pool-1-thread-1 |   处理异常:java.lang.RuntimeException: 啊,我发生异常了,没有返回值
1692586364374   |   15  |   pool-1-thread-1 |   exceptionally 中处理异常
1692586364378   |   1   |   main    |   执行结果:exceptionally 处理异常:java.lang.RuntimeException: 啊,我发生异常了
 */

批量执行任务

场景:需要批量执行多个任务,每次任务数量不固定,可能是 10 个,也可能是 20 个,需要等到所有任务都执行完毕才能进行下一步。比方说批量请求第三方接口查询价格,等所有价格都查询结束后再进行后续操作。

public static void main(String[] args) {

    ExecutorService executorService = Executors.newFixedThreadPool(6);

    // 存放任务集合
    List<CompletableFuture<String>> cfList = new ArrayList<>();
    // 存放任务结果
    Map<String, String> map = new ConcurrentHashMap<>(20);

    for (int i = 0; i < new Random().nextInt(5) + 3; i++) {
        // 模拟随机任务个数
        int finalI = i;
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread(String.format("执行任务 %s,查询价格", finalI));
            // 模拟任务执行时间
            SmallTool.sleepMillis(new Random().nextInt(3) * 1000);
            String result = String.format("任务 %s 价格", finalI);
            map.put("任务" + finalI, result);
            return result;
        }, executorService);

        cfList.add(cf);
    }

    // 等待所有任务执行完毕
    CompletableFuture<Void> allOf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]));
    allOf.join();

    SmallTool.printTimeAndThread(String.format("任务执行完毕,执行结果:%s", map));

    for (CompletableFuture<String> future : cfList) {
        // 获取单个任务执行结果
        String result = future.join();
        SmallTool.printTimeAndThread(result);
    }
}
/* 输出:
1692935524076	|	14	|	pool-1-thread-3	|	执行任务 2,查询价格
1692935524076	|	13	|	pool-1-thread-2	|	执行任务 1,查询价格
1692935524076	|	12	|	pool-1-thread-1	|	执行任务 0,查询价格
1692935524076	|	15	|	pool-1-thread-4	|	执行任务 3,查询价格
1692935526078	|	1	|	main	|	任务执行完毕,执行结果:{任务0=任务 0 价格, 任务1=任务 1 价格, 任务2=任务 2 价格, 任务3=任务 3 价格}
1692935526078	|	1	|	main	|	任务 0 价格
1692935526079	|	1	|	main	|	任务 1 价格
1692935526079	|	1	|	main	|	任务 2 价格
1692935526079	|	1	|	main	|	任务 3 价格
*/

三、使用案例

实现最优的“烧水泡茶”程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

基于Future实现

public class FutureTaskTest{

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务T2的FutureTask
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());
        // 创建任务T1的FutureTask
        FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));

        // 线程T1执行任务ft2
        Thread T1 = new Thread(ft2);
        T1.start();
        // 线程T2执行任务ft1
        Thread T2 = new Thread(ft1);
        T2.start();
        // 等待线程T1执行结果
        System.out.println(ft1.get());

    }
}

// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
    FutureTask<String> ft2;
    // T1任务需要T2任务的FutureTask
    T1Task(FutureTask<String> ft2){
        this.ft2 = ft2;
    }
    @Override
    public String call() throws Exception {
        System.out.println("T1:洗水壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T1:烧开水...");
        TimeUnit.SECONDS.sleep(15);
        // 获取T2线程的茶叶
        String tf = ft2.get();
        System.out.println("T1:拿到茶叶:"+tf);

        System.out.println("T1:泡茶...");
        return "上茶:" + tf;
    }
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("T2:洗茶壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T2:洗茶杯...");
        TimeUnit.SECONDS.sleep(2);

        System.out.println("T2:拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return "龙井";
    }
}

基于CompletableFuture实现

public class CompletableFutureTest {

    public static void main(String[] args) {

        //任务1:洗水壶->烧开水
        CompletableFuture<Void> f1 = CompletableFuture
            .runAsync(() -> {
                System.out.println("T1:洗水壶...");
                sleep(1, TimeUnit.SECONDS);

                System.out.println("T1:烧开水...");
                sleep(15, TimeUnit.SECONDS);
            });
        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<String> f2 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("T2:洗茶壶...");
                sleep(1, TimeUnit.SECONDS);

                System.out.println("T2:洗茶杯...");
                sleep(2, TimeUnit.SECONDS);

                System.out.println("T2:拿茶叶...");
                sleep(1, TimeUnit.SECONDS);
                return "龙井";
            });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶叶:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
        //等待任务3执行结果
        System.out.println(f3.join());
    }

    static void sleep(int t, TimeUnit u){
        try {
            u.sleep(t);
        } catch (InterruptedException e) {
        }
    }
}