CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
创建异步任务
Future.submit、supplyAsync / runAsync
异步回调
thenApply / thenApplyAsync、thenAccept / thenRun、exceptionally、whenComplete、handle
组合处理
thenCombine / thenAcceptBoth / runAfterBoth、applyToEither / acceptEither / runAfterEither、thenCompose、allOf / anyOf
创建异步任务
Future.submit
通常的线程池接口类ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,
3个重载的submit方法的返回值是Future,可以据此获取任务执行的状态和结果,示例如下:
1 |
|
supplyAsync / runAsync
supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable<T> task)
方法,
runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)
方法,
这两方法的效果跟submit是一样的,测试用例如下:
1 |
|
这两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()
,如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程。测试用例如下:
1 |
|
异步回调
thenApply / thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,测试用例如下:
1 |
|
thenApplyAsync与thenApply的区别在于,前者是将job2提交到线程池中异步执行,实际执行job2的线程可能是另外一个线程,后者是由执行job1的线程立即执行job2,即两个job都是同一个线程执行的。
thenApplyAsync有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()
。
下述的多个方法,每个方法都有两个以Async结尾的方法,一个使用默认的Executor实现,一个使用指定的Executor实现,不带Async的方法是由触发该任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务的线程不一定是同一个。
thenAccept / thenRun
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值;thenRun 的方法没有入参,也没有返回值,测试用例如下:
1 |
|
exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,测试用例如下:
1 |
|
whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。测试用例如下:
1 |
|
handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。测试用例如下:
1 |
|
组合处理
thenCombine / thenAcceptBoth / runAfterBoth
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。
注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
1 |
|
applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;
acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。
注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
1 |
|
thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,测试用例如下:
1 |
|
allOf / anyOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
1 |
|
其他示例
线程池处理任务
1 | public class ThreadDemo { |
使用CompletableFuture重构任务处理
1 | public class ThreadDemo { |
如果你还想同步获取结果,可以使用whenComplete()方法,或者单独调用join()方法。
join()方法配合Stream流是这样用的:
1 | public class ThreadDemo { |
等待所有任务执行完成
线程池+CountDownLatch
1 | public class ThreadDemo { |
用CompletableFuture重构:
1 | public class ThreadDemo { |
遍历list集合,提交CompletableFuture任务,把结果转换成数组
再把数组放到CompletableFuture的allOf()方法里面
最后调用join()方法阻塞等待所有任务执行完成
CompletableFuture的allOf()方法的作用就是,等待所有任务处理完成。
任何一个任务处理完成就返回
如果要实现这样一个需求,往线程池提交一批任务,只要有其中一个任务处理完成就返回。
如果你手动实现这个逻辑的话,代码肯定复杂且低效,有了CompletableFuture就非常简单了,只需调用anyOf()方法就行了。
一个线程执行完成,交给另一个线程接着执行
1 | public class ThreadDemo { |
1 | public class ThreadDemo { |
总结
1 | 带run的方法,无入参,无返回值。 |
1 | //获取结果 |