Java-并发.06b.JUC-执行器-Future

Future

Callable 接口类似于 Runnable,但是 Runnable 不会返回结果,并且无法抛出返回结果的异常,Callable 功能更强大一些,被线程执行后,可以返回结果。

  • Callable 有些类似 Runnable, 它们都是接口, 前者需要实现 V call(), 后者需要实现 void run();
  • Callable 需要用 FutureTask 包装一下, FutureTask 提供了 get() 方法, 可以获取执行结果;
  • FutureTask 需要通过 Thread 执行: 通过构造器 Thread(FutureTask) 创建 Thread 对象
  • 结果通过 FutureTask.get 获取, FutureTask 实现了 Future 接口, 通过 FutureTaskFuture 类型引用都可以调用 get() , cancel() , isDone() , isCancelled() 等方法;
  • Callable 也可以放入线程池, ExecutorService.submit(Callable) 把 Callable 提交到线程池并返回 Future ;
public class FutureAndFutureTaskExample {

/*
* FutureTask 示例1:
* 由FutureTask直接创建线程,并执行
*/
public static void futureTaskExample() {

// 创建Callable实现类
Callable callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};

// 创建FutureTask
FutureTask futureTask = new FutureTask(callable);

// 创建Thread 并开始执行
new Thread(futureTask).start();

// 阻塞在此, 直到任务完成:
Integer result = futureTask.get();
}

/*
* Future 示例2:
* Callable 提交到线程池执行
*/
public static void futureExample() {
// Lambda创建Callable实现类
Callable<Integer> callable = () -> {
return new Random().nextInt(100);
} ;

ExecutorService executorService = Executors.newCachedThreadPool();
Future<Integer> future = executorService.submit(callable);

Integer result = future.get();
// 或取消线程
future.cancel(true);
}
}

问题: FutureTask.cancel()Thread.interrupt() 有什么区别?
通过查看 cancel() 的源码发现, 实际 cancel() 最终还是调用了 Thread.interrupt(),
所以, 要通过 FutureTask.cancel() 停止异步任务, 那么还需要在 RunnableCallable 的主循环里捕捉 InterruptException 异常.

ListenableFuture(Guava)

Guava 的 Listenable Future 对 Future 做了改进,支持注册一个任务执行结束后回调函数。

// 创建一个 ListenableFuture
ListenableFuture<String> listenableFuture =
listeningExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});

// 通过addCallback() 给 ListenableFuture增加回调
Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {
public void onSuccess(Object result) {
// do something on success
}
public void onFailure(Throwable thrown) {
// do something on failure
}
});

CompletableFuture(Java8)

Future 是 Java 5添加的类,用来描述一个异步计算的结果。你可以使用 isDone 方法检查计算是否完成,或者使用 get 阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel 方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

使用示例

Java8 的 CompletableFuture 参考了 Guava 的 ListenableFuture 的思路,CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。

CompletableFuture 弥补了 Future 模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过 thenAccept、thenApply、thenCompose 等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

与 Guava ListenableFuture 相比,CompletableFuture 不仅可以在任务完成时注册回调通知,而且可以指定任意线程,实现了真正的异步非阻塞。

▶ 创建一个 CompletableFuture:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync 方法不支持返回值/supplyAsync 可以支持返回值
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

▶ 使用 thenApply 串行任务:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
T:上一个任务返回结果的类型
U:当前任务的返回值类型

▶使用 thenAccept 消费处理结果:

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

▶ 使用 thenCombine 合并任务:

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

thenCombine 会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

▶ 使用 thenCompose 流水化处理任务:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

▶ 代码示例 1: thenApply/whenComplete/exceptionally

public static void example() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long result = new Random().nextInt(100);
return result;
}
}).thenApply(new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long result = t*5;
System.out.println("result2="+result);
return result;
}
});

future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}

});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!"+t.getMessage());
return null;
}
});
}

▶ anyOf / allOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

anyOf: 当任意一个 CompletableFuture 完成后,创建一个完成的 CompletableFuture
allOf: 当所有的阶段完成后, 创建一个完成的 CompletableFuture

public static CompletableFuture<List> example() {
CompletableFuture<Double> future1 = rpcService1.invoke();
CompletableFuture<Double> future2 = rpcService2.invoke();
return CompletableFuture
.allOf(future1, future2)
.thenApply(v -> {
Double d1 = future1.get();
Double d2 = future2.get();
return Arrays.asList(d1, d2);
});
}

本节参考:

Timeout

vs ParallelStream

@ref: Java8的completablefuture和parallel stream比较 -解道Jdon

  • 可以上面的代码, ParallelStream 使用的 forkJoinPool, 处理过程会回到主线程调用 task(因为 forkJoinPool 使用了分治+递归,要回到主线程)
  • CompletableFuture 默认也是使用 forkJoinPool 的线程池, 但没有回到主线程
  • 因为 CompletableFuture 默认使用 forkJoinPool 线程池,所以线程池大小有限制 = processor 的数量,当然 CompletableFuture 也可以自己指定线程池,如果不是 CPU 密集而是 IO 密集的任务,最好是指定自己的线程池( CPU Processor 的 2~3 倍 )