Future
Callable
接口类似于 Runnable,但是 Runnable 不会返回结果,并且无法抛出返回结果的异常,Callable
功能更强大一些,被线程执行后,可以返回结果。
Callable
有些类似Runnable
, 它们都是接口, 前者需要实现V call()
, 后者需要实现void run()
;Callable
需要用FutureTask
包装一下,FutureTask
提供了get()
方法, 可以获取执行结果;
- FutureTask 需要通过 Thread 执行: 通过构造器
Thread(FutureTask)
创建 Thread 对象
- 结果通过
FutureTask.get
获取,FutureTask
实现了Future
接口, 通过FutureTask
和Future
类型引用都可以调用get()
,cancel()
,isDone()
,isCancelled()
等方法; Callable
也可以放入线程池,ExecutorService.submit(Callable)
把 Callable 提交到线程池并返回 Future ;
public class FutureAndFutureTaskExample { |
问题: FutureTask.cancel()
和 Thread.interrupt()
有什么区别?
通过查看 cancel()
的源码发现, 实际 cancel()
最终还是调用了 Thread.interrupt()
,
所以, 要通过 FutureTask.cancel()
停止异步任务, 那么还需要在 Runnable
或 Callable
的主循环里捕捉 InterruptException 异常.
ListenableFuture(Guava)
Guava 的 Listenable Future 对 Future 做了改进,支持注册一个任务执行结束后回调函数。
// 创建一个 ListenableFuture |
CompletableFuture(Java8)
Future 是 Java 5添加的类,用来描述一个异步计算的结果。你可以使用 isDone
方法检查计算是否完成,或者使用 get
阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel
方法停止任务的执行。
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
使用示例
Java8 的 CompletableFuture 参考了 Guava 的 ListenableFuture 的思路,CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。
CompletableFuture 弥补了 Future 模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过 thenAccept、thenApply、thenCompose 等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。
与 Guava ListenableFuture 相比,CompletableFuture 不仅可以在任务完成时注册回调通知,而且可以指定任意线程,实现了真正的异步非阻塞。
▶ 创建一个 CompletableFuture:public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync 方法不支持返回值/supplyAsync 可以支持返回值
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
▶ 使用 thenApply 串行任务:public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
T:上一个任务返回结果的类型
U:当前任务的返回值类型
▶使用 thenAccept 消费处理结果:public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
▶ 使用 thenCombine 合并任务:public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
thenCombine 会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
▶ 使用 thenCompose 流水化处理任务:public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
▶ 代码示例 1: thenApply/whenComplete/exceptionally
public static void example() throws Exception { |
▶ anyOf / allOfpublic static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
anyOf: 当任意一个 CompletableFuture 完成后,创建一个完成的 CompletableFuture
allOf: 当所有的阶段完成后, 创建一个完成的 CompletableFuture
public static CompletableFuture<List> example() { |
本节参考:
Timeout
@ref Java 8 CompletableFuture中的默认值超时 - Thinbug
似乎是翻译的 Stack Overflow …@ref Asynchronous timeouts with CompletableFutures in Java 8 and Java 9
超时 & CompletableFutures
如果直接对 CompletableFutures 使用 Future.get(1, TimeUnit.SECONDS) 作为超时, 这样做仍旧阻塞 main 线程;
在 Java9 中的支持: completeOnTimeout 或者 orTimeout
vs ParallelStream
@ref: Java8的completablefuture和parallel stream比较 -解道Jdon
- 可以上面的代码, ParallelStream 使用的 forkJoinPool, 处理过程会回到主线程调用 task(因为 forkJoinPool 使用了分治+递归,要回到主线程)
- CompletableFuture 默认也是使用 forkJoinPool 的线程池, 但没有回到主线程
- 因为 CompletableFuture 默认使用 forkJoinPool 线程池,所以线程池大小有限制 = processor 的数量,当然 CompletableFuture 也可以自己指定线程池,如果不是 CPU 密集而是 IO 密集的任务,最好是指定自己的线程池( CPU Processor 的 2~3 倍 )