告别回调地狱:Java并发编程的终极武器 CompletableFuture
在 Java 5 引入 Future
和 ExecutorService
后,我们终于有了一种标准的方式来执行异步任务并获取其结果。然而,这种方式并不完美。你是否也曾陷入过这样的窘境:
- 为了拿到结果,不得不调用
future.get()
,让当前线程陷入漫长的阻塞等待。 - 想在一个任务完成后自动触发另一个任务,却发现
Future
接口根本不支持。 - 需要等待多个异步任务全部完成后再进行汇总,只能用
CountDownLatch
之类的同步器写一堆胶水代码。 - 异常处理逻辑和主流程代码混杂在一起,
try-catch(ExecutionException)
的嵌套让代码难以卒读。
如果你对以上场景感同身受,那么恭喜你,CompletableFuture
(CF) 正是你需要的那把瑞士军刀。它在 Java 8 中横空出世,彻底改变了 Java 异步编程的面貌。
本文将带你深入探索 CompletableFuture
的世界,你将学到:
- 为什么需要
CompletableFuture
?Future
的局限性在哪里? - 核心思想: 如何从“管理线程”转变为“编排任务”。
- API 详解: 从创建任务、链式调用到组合多个任务和异常处理。
- 线程池:
CompletableFuture
背后的动力引擎,以及如何正确使用它。 - 实战技巧与最佳实践: 超时处理、避免陷阱,写出优雅高效的并发代码。
1. The “Future” Was Not Enough - Future
的困境
在深入 CompletableFuture
之前,我们必须理解它要解决的问题。java.util.concurrent.Future
是一个简单的异步计算模型,它代表一个未来某个时刻会产生的结果。但它的主要问题在于:它太被动了。
1 | ExecutorService executor = Executors.newSingleThreadExecutor(); |
Future
的核心局限性:
- 无法主动完成:你不能手动设置一个
Future
的结果,它只能由执行它的线程池来完成。 - 无法链式调用:你不能告诉
Future
:“当你完成后,请用你的结果去做另一件事”。 - 没有组合能力:你不能轻松地将两个
Future
合并成一个,或者等待一组Future
中的任何一个或全部完成。 - 没有统一的异常处理:你必须在调用
get()
的地方显式地try-catch
异常。
CompletableFuture
实现了 Future
接口,并新增了 CompletionStage
接口,它带来了函数式编程的理念,让我们能够以一种声明式、非阻塞的方式来编排和组合异步任务。
2. CompletableFuture
核心思想:异步任务编排
想象一下你去一家高级餐厅点餐的流程:
- 下单 (创建任务):你告诉服务员你要一份牛排 (
supplyAsync
),服务员给你一张订单小票 (CompletableFuture
对象)。 - 非阻塞等待:你不会冲进厨房盯着厨师做菜,而是回到座位上玩手机 (
主线程继续执行
)。 - 声明后续操作 (任务编排):你告诉服务员:“牛排好了之后 (
then...
),请帮我配一杯红酒 (thenApply
),然后一起端上来 (thenAccept
)。” - 处理意外 (异常处理):你还告诉服务员:“如果牛排卖完了 (
exceptionally
),就给我换成烤鸡。”
CompletableFuture
就是这张神奇的订单小票,你可以不断地在上面追加指令,而不需要阻塞自己去等待每一步的完成。它的核心就是从管理线程和等待,转向描述任务之间的数据流和依赖关系。
3. 创建 CompletableFuture
:四种主要方式
1 | // 1. 运行一个不返回结果的异步任务 (Runnable) |
注意:不带 Executor
参数的 runAsync
和 supplyAsync
方法会默认使用 ForkJoinPool.commonPool()
,我们将在后面详细讨论这带来的影响。
4. 任务编排:CompletableFuture
的魔法核心 (The then...
Family)
这是 CF 最强大的部分。所有以 then
开头的方法都用于创建一个任务流水线。它们分为三类:
thenApply
/thenCompose
: 对结果进行转换和串联。thenAccept
/thenAcceptBoth
: 对结果进行消费,无返回值。thenRun
/runAfterBoth
: 任务完成后执行一个动作,不关心结果。
4.1 thenApply
vs thenCompose
:转换与串联
这两个方法最容易混淆,但至关重要。
thenApply(Function)
:当上一个任务完成后,将其结果作为输入传递给一个Function
,函数的返回值是下一个任务的结果。类似于 Stream 中的map
。用于同步的转换。1
2
3
4
5CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "123")
.thenApply(Integer::parseInt) // String -> Integer
.thenApply(i -> i * 10); // Integer -> Integer
System.out.println(future.get()); // 输出 1230thenCompose(Function)
:当上一个任务完成,将其结果传递给一个Function
,但这个Function
的返回值必须是另一个CompletableFuture
。CF 会等待这个新的 CF 完成,并将其结果作为最终结果。类似于 Stream 中的flatMap
。用于异步任务的串联。假设你有两个需要异步调用的方法:
getUserInfo()
和getFriendList(userInfo)
。1
2
3
4
5
6
7// 错误示范:使用 thenApply 会得到一个嵌套的 CompletableFuture
CompletableFuture<CompletableFuture<List<String>>> nestedFuture =
getUserInfo().thenApply(userInfo -> getFriendList(userInfo));
// 正确示范:使用 thenCompose 会得到一个扁平的、最终的结果
CompletableFuture<List<String>> flatFuture =
getUserInfo().thenCompose(userInfo -> getFriendList(userInfo));经验法则:如果你的转换逻辑本身就是一个异步操作,请使用
thenCompose
。
4.2 thenAccept
与 thenRun
:消费结果与执行动作
thenAccept(Consumer)
:接收上一步的结果,并对其进行消费,但没有返回值(Void
)。1
2CompletableFuture.supplyAsync(() -> "Product-123")
.thenAccept(productId -> System.out.println("Received product: " + productId));thenRun(Runnable)
:不关心上一步的结果,只要上一步完成了,就执行一个Runnable
任务。1
2
3
CompletableFuture.supplyAsync(() -> "Some result")
.thenRun(() -> System.out.println("Task finished, cleaning up..."));
4.3 组合两个独立的 Future
thenCombine(other, BiFunction)
: 当两个独立的 Future 都完成时,将它们的结果作为BiFunction
的参数,返回一个新的结果。1
2
3
4
5
6
7
8CompletableFuture<Double> weightInKg = CompletableFuture.supplyAsync(() -> 75.0);
CompletableFuture<Double> heightInM = CompletableFuture.supplyAsync(() -> 1.80);
CompletableFuture<Double> bmiFuture = weightInKg.thenCombine(heightInM, (weight, height) -> {
return weight / (height * height);
});
System.out.println("BMI is: " + bmiFuture.get());
5. 组合多个 CompletableFuture
:allOf
与 anyOf
CompletableFuture.allOf(cfs...)
: 当所有给定的 CF 都完成时,返回的 CF 才会完成。注意,它返回的是CompletableFuture<Void>
,你无法直接从中获取所有结果。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 正确获取 allOf 结果的方式
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "Result 3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);
// allOf 完成后,所有的 future 肯定都完成了
CompletableFuture<List<String>> allResultsFuture = allFutures.thenApply(v -> {
return Stream.of(f1, f2, f3)
.map(CompletableFuture::join) // join() 是 get() 的非受检异常版本
.collect(Collectors.toList());
});
System.out.println(allResultsFuture.join()); // ["Result 1", "Result 2", "Result 3"]CompletableFuture.anyOf(cfs...)
: 当任意一个给定的 CF 完成时,返回的 CF 就会完成,并且结果是那个首先完成的 CF 的结果。返回类型是CompletableFuture<Object>
。1
2
3
4
5
6
7
8
9
10
11
12// 谁快用谁
CompletableFuture<String> fastSource = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(100); } catch (Exception e) {}
return "Fastest source";
});
CompletableFuture<String> slowSource = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (Exception e) {}
return "Slow source";
});
CompletableFuture<Object> first = CompletableFuture.anyOf(fastSource, slowSource);
System.out.println("First result: " + first.join()); // 输出 "Fastest source"
6. 优雅的异常处理:exceptionally
与 handle
exceptionally(Function)
: 像try-catch
中的catch
块。当流水线中任何一步出现异常时,它会跳过后续的thenApply
等,直接进入exceptionally
块,你可以在这里提供一个默认值或进行补救。1
2
3
4
5
6
7
8
9
10
11CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Oops, something went wrong!");
}
return "Success!";
}).exceptionally(ex -> {
System.err.println("CAUGHT EXCEPTION: " + ex.getMessage());
return "Default Value"; // 提供一个备用结果
});
System.out.println("Final result: " + future.join());handle(BiFunction)
: 像try-catch-finally
中的finally
块。无论成功还是失败,它都会被执行。它接收两个参数:result
和exception
。如果成功,exception
为null
;如果失败,result
为null
。这给你一个机会来处理最终状态。1
2
3
4
5
6
7
8
9CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Success")
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return "Result: " + result.toUpperCase();
});
System.out.println(future.join());
7. 线程池:不可忽视的幕后英雄
这是一个至关重要且常常被忽视的点。
CompletableFuture
本身不创建线程,它需要一个 Executor
来执行任务。
- 默认线程池
ForkJoinPool.commonPool()
:所有不带Executor
参数的Async
方法(如supplyAsync
,thenApplyAsync
)都使用这个全局共享的线程池。它的线程数默认为Runtime.getRuntime().availableProcessors() - 1
。 - 风险:
commonPool
是为 CPU 密集型任务设计的。如果你的任务是 I/O 密集型(如数据库查询、HTTP请求),这些任务会长时间阻塞线程,并且不释放 CPU。如果你大量使用这样的任务,会迅速耗尽commonPool
中的所有线程,导致整个 JVM 中所有依赖commonPool
的功能(例如并行流parallelStream
)都发生性能雪崩。
最佳实践:为不同类型的任务创建专用线程池!
1 | // 为 I/O 密集型任务创建一个专用的线程池 |
关键区别:thenApply
会尝试使用上一个阶段的线程。而 thenApplyAsync(..., executor)
会将任务提交到你指定的 executor
中执行,实现线程切换,隔离不同类型的任务。
8. 实战技巧与注意事项
超时处理 (Java 9+)
orTimeout(long timeout, TimeUnit unit)
: 如果在指定时间内未完成,将以TimeoutException
异常完成。completeOnTimeout(T value, long timeout, TimeUnit unit)
: 如果超时,则使用给定的默认值完成。
1
2
3CompletableFuture.supplyAsync(() -> {
// ... long running task ...
}).orTimeout(3, TimeUnit.SECONDS).join(); // 超时会抛出 TimeoutException避免在异步链中调用
join()
/get()
join()
和get()
是阻塞的,它们的存在是为了在异步世界的边界(例如,一个 Web Controller 的入口方法需要等待结果并返回给用户)获取最终结果。在你的业务逻辑流水线中,应全程使用then...
方法保持异步特性。命名你的线程池
使用ThreadFactory
(如 Guava 的ThreadFactoryBuilder
) 给你的线程池命名。当出现问题时,清晰的线程名(如io-pool-1
,rpc-pool-5
)在线程 dump 和日志中会非常有价值。
总结
CompletableFuture
是 Java 并发工具箱中的一把瑞士军刀,它将你从繁琐的线程管理和阻塞等待中解放出来。通过它,你可以用一种优雅的、声明式的、函数式的方式来编排复杂的异步工作流。
掌握 CompletableFuture
的关键在于思维的转变:
- 从命令式到声明式:你不再命令线程“去做这个,然后等待”,而是声明“当A完成后,用它的结果去做B”。
- 从管理线程到编排数据流:你的焦点变成了任务之间的依赖关系和数据如何流动。
- 从阻塞到非阻塞:让你的应用资源得到最大化利用。
尽管它的 API 看起来有些复杂,但一旦你掌握了 thenApply/thenCompose
的区别、allOf/anyOf
的用法以及为 I/O 任务使用专用线程池的最佳实践,你就能写出高质量、高吞吐、易于维护的现代并发代码。现在,是时候在你的下一个项目中拥抱它了!