https://www.bilibili.com/video/BV19EapznEtX
- Future是Java 5引入的接口,用于表示异步计算的结果。它提供了基本的异步操作功能,当只需要简单的异步执行和结果等待时使用
- CompletableFuture是Java 8引入的类,实现了Future和CompletionStage接口,提供了强大的异步编程能力,当需要复杂的异步编程,包括链式操作、异常处理、多任务组合等现代异步编程模式时使用
特性 | Future | CompletableFuture |
---|
版本 | Java 5 | Java 8 |
类型 | 接口 | 类(实现Future接口) |
手动完成 | ❌ 不支持 | ✅ 支持complete() 方法 |
链式操作 | ❌ 不支持 | ✅ 支持thenApply 、thenCompose 等 |
异常处理 | ❌ 只能try-catch | ✅ 支持exceptionally 、handle 等 |
组合操作 | ❌ 不支持 | ✅ 支持allOf 、anyOf 、thenCombine 等 |
回调函数 | ❌ 不支持 | ✅ 支持whenComplete 、thenAccept 等 |
非阻塞获取 | ❌ 只有阻塞的get() | ✅ 支持getNow() 非阻塞获取 |
超时处理 | ✅ get(timeout, unit) | ✅ 更丰富的超时控制 |
取消操作 | ✅ cancel() | ✅ 继承了cancel() 并增强 |
方法名 | 描述 | 示例 | 应用场景 |
---|
supplyAsync() | 异步执行有返回值的任务 | CompletableFuture.supplyAsync(() -> "result") | 异步计算并返回结果 |
runAsync() | 异步执行无返回值的任务 | CompletableFuture.runAsync(() -> System.out.println("done")) | 异步执行副作用操作 |
completedFuture() | 创建已完成的Future | CompletableFuture.completedFuture("value") | 立即返回结果,常用于测试 |
failedFuture() | 创建异常完成的Future | CompletableFuture.failedFuture(new Exception()) | 立即返回异常状态 |
方法名 | 描述 | 示例 | 特点 |
---|
thenApply() | 对结果进行转换 | future.thenApply(s -> s.toUpperCase()) | 同步转换,阻塞当前线程 |
thenAccept() | 消费结果,无返回值 | future.thenAccept(System.out::println) | 同步消费 |
thenRun() | 执行Runnable,不关心结果 | future.thenRun(() -> log.info("done")) | 同步执行 |
thenCompose() | 扁平化嵌套的CompletableFuture | future.thenCompose(this::getUser) | 避免嵌套Future |
方法名 | 描述 | 示例 | 特点 |
---|
thenApplyAsync() | 异步转换结果 | future.thenApplyAsync(s -> s.toUpperCase()) | 使用线程池异步执行 |
thenAcceptAsync() | 异步消费结果 | future.thenAcceptAsync(System.out::println) | 异步消费 |
thenRunAsync() | 异步执行Runnable | future.thenRunAsync(() -> log.info("done")) | 异步执行 |
thenComposeAsync() | 异步扁平化 | future.thenComposeAsync(this::getUserAsync) | 异步组合 |
方法名 | 描述 | 示例 | 应用场景 |
---|
thenCombine() | 组合两个Future的结果 | f1.thenCombine(f2, (a, b) -> a + b) | 需要两个结果进行计算 |
thenCombineAsync() | 异步组合两个Future | f1.thenCombineAsync(f2, (a, b) -> a + b) | 异步组合计算 |
thenAcceptBoth() | 消费两个Future的结果 | f1.thenAcceptBoth(f2, (a, b) -> log(a, b)) | 需要两个结果进行操作 |
thenAcceptBothAsync() | 异步消费两个Future | f1.thenAcceptBothAsync(f2, (a, b) -> log(a, b)) | 异步消费操作 |
runAfterBoth() | 两个Future都完成后执行 | f1.runAfterBoth(f2, () -> cleanup()) | 等待多个任务完成 |
runAfterBothAsync() | 异步等待两个完成后执行 | f1.runAfterBothAsync(f2, () -> cleanup()) | 异步等待执行 |
applyToEither() | 使用最先完成的结果 | f1.applyToEither(f2, result -> process(result)) | 竞争执行,使用最快结果 |
applyToEitherAsync() | 异步使用最先完成的结果 | f1.applyToEitherAsync(f2, result -> process(result)) | 异步竞争执行 |
acceptEither() | 消费最先完成的结果 | f1.acceptEither(f2, System.out::println) | 处理最快完成的任务 |
acceptEitherAsync() | 异步消费最先完成的结果 | f1.acceptEitherAsync(f2, System.out::println) | 异步处理最快任务 |
runAfterEither() | 任一完成后执行 | f1.runAfterEither(f2, () -> notify()) | 任意一个完成即可触发 |
runAfterEitherAsync() | 异步任一完成后执行 | f1.runAfterEitherAsync(f2, () -> notify()) | 异步触发执行 |
方法名 | 描述 | 示例 | 使用场景 |
---|
allOf() | 等待所有Future完成 | CompletableFuture.allOf(f1, f2, f3) | 需要所有任务都完成 |
anyOf() | 等待任一Future完成 | CompletableFuture.anyOf(f1, f2, f3) | 只需要任意一个完成 |
方法名 | 描述 | 示例 | 特点 |
---|
exceptionally() | 异常时提供默认值 | future.exceptionally(ex -> "default") | 只处理异常情况 |
exceptionallyAsync() | 异步异常处理 | future.exceptionallyAsync(ex -> "default") | 异步异常处理 |
handle() | 处理正常结果和异常 | future.handle((result, ex) -> ex != null ? "error" : result) | 统一处理成功和失败 |
handleAsync() | 异步处理结果和异常 | future.handleAsync((result, ex) -> process(result, ex)) | 异步统一处理 |
whenComplete() | 完成时回调(不改变结果) | future.whenComplete((result, ex) -> log(result, ex)) | 用于日志、清理等副作用 |
whenCompleteAsync() | 异步完成回调 | future.whenCompleteAsync((result, ex) -> log(result, ex)) | 异步副作用处理 |
方法名 | 描述 | 示例 | 特点 |
---|
get() | 阻塞获取结果 | future.get() | 会抛出检查异常 |
get(timeout, unit) | 带超时的阻塞获取 | future.get(5, TimeUnit.SECONDS) | 超时抛出TimeoutException |
join() | 阻塞获取结果 | future.join() | 抛出运行时异常 |
getNow(defaultValue) | 非阻塞获取 | future.getNow("default") | 立即返回,未完成则返回默认值 |
resultNow() | 获取已完成的结果 | future.resultNow() | Java 19+,未完成抛异常 |
exceptionNow() | 获取异常 | future.exceptionNow() | Java 19+,获取异常信息 |
方法名 | 描述 | 示例 | 使用场景 |
---|
complete() | 手动完成Future | future.complete("result") | 主动设置结果 |
completeExceptionally() | 手动异常完成 | future.completeExceptionally(new Exception()) | 主动设置异常 |
cancel() | 取消执行 | future.cancel(true) | 取消未完成的任务 |
isDone() | 检查是否完成 | future.isDone() | 状态查询 |
isCompletedExceptionally() | 检查是否异常完成 | future.isCompletedExceptionally() | 异常状态查询 |
isCancelled() | 检查是否被取消 | future.isCancelled() | 取消状态查询 |
obtrudeValue() | 强制设置结果 | future.obtrudeValue("forced") | 强制覆盖结果 |
obtrudeException() | 强制设置异常 | future.obtrudeException(new Exception()) | 强制覆盖为异常 |
方法名 | 描述 | 示例 | 高级特性 |
---|
thenCombineAsync(f, fn, executor) | 指定线程池的组合 | f1.thenCombineAsync(f2, fn, customPool) | 自定义执行环境 |
thenComposeAsync(fn, executor) | 指定线程池的组合 | future.thenComposeAsync(fn, customPool) | 控制执行线程池 |
handleAsync(fn, executor) | 指定线程池的异常处理 | future.handleAsync(fn, customPool) | 异步异常处理 |
whenCompleteAsync(action, executor) | 指定线程池的完成回调 | future.whenCompleteAsync(action, customPool) | 异步完成回调 |
exceptionallyAsync(fn, executor) | 指定线程池的异常处理 | future.exceptionallyAsync(fn, customPool) | 自定义异常处理线程池 |
方法名 | 描述 | 示例 | 应用场景 |
---|
orTimeout(timeout, unit) | 设置超时 | future.orTimeout(5, TimeUnit.SECONDS) | 防止永久等待 |
completeOnTimeout(value, timeout, unit) | 超时时完成 | future.completeOnTimeout("timeout", 5, SECONDS) | 超时提供默认值 |
copy() | 创建副本 | future.copy() | 创建独立的Future副本 |
newIncompleteFuture() | 创建新的未完成Future | future.newIncompleteFuture() | 创建同类型新实例 |
defaultExecutor() | 获取默认执行器 | CompletableFuture.defaultExecutor() | 获取ForkJoinPool |
minimalCompletionStage() | 创建最小完成阶段 | future.minimalCompletionStage() | 限制API访问 |
toCompletableFuture() | 转换为CompletableFuture | stage.toCompletableFuture() | CompletionStage转换 |
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(String::toUpperCase)
.thenApply(s -> s + " WORLD")
.thenAccept(System.out::println);
CompletableFuture.runAsync(() -> {
System.out.println("执行清理任务");
// 清理逻辑
}).thenRun(() -> {
System.out.println("清理完成");
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (a, b) -> a + " " + b)
.thenAccept(System.out::println); // 输出: Hello World
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功结果";
}).exceptionally(ex -> "异常时的默认值")
.thenAccept(System.out::println);
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "任务3");
CompletableFuture.allOf(task1, task2, task3)
.thenRun(() -> {
System.out.println("所有任务完成");
System.out.println(task1.join());
System.out.println(task2.join());
System.out.println(task3.join());
});
CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "快速任务";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "慢速任务";
});
fast.applyToEither(slow, result -> "最先完成: " + result)
.thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> {
sleep(2000); // 模拟耗时操作
return "超时测试";
}).orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> "操作超时")
.thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> "处理数据")
.thenApply(data -> data + " -> 已处理")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("处理失败: " + ex.getMessage());
} else {
System.out.println("处理成功: " + result);
}
});
CompletableFuture.supplyAsync(() -> getUserId())
.thenCompose(userId -> getUserInfo(userId))
.thenCompose(userInfo -> getUserOrders(userInfo.getId()))
.thenAccept(orders -> System.out.println("订单数量: " + orders.size()));
CompletableFuture<Integer> calculateA = CompletableFuture.supplyAsync(() -> {
sleep(100);
return 10;
});
CompletableFuture<Integer> calculateB = CompletableFuture.supplyAsync(() -> {
sleep(200);
return 20;
});
calculateA.thenCombine(calculateB, Integer::sum)
.thenAccept(result -> System.out.println("计算结果: " + result));
CompletableFuture.supplyAsync(() -> 85) // 模拟分数
.thenCompose(score -> {
if (score >= 90) {
return CompletableFuture.completedFuture("优秀");
} else if (score >= 60) {
return CompletableFuture.completedFuture("及格");
} else {
return CompletableFuture.completedFuture("不及格");
}
})
.thenAccept(result -> System.out.println("评级: " + result));
List<String> tasks = Arrays.asList("任务1", "任务2", "任务3");
List<CompletableFuture<String>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> "完成" + task))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.thenAccept(results -> results.forEach(System.out::println));
CompletableFuture<String> userTask = CompletableFuture.supplyAsync(() -> "用户数据");
CompletableFuture<String> configTask = CompletableFuture.supplyAsync(() -> "配置数据");
userTask.thenAcceptBoth(configTask, (user, config) -> {
System.out.println("用户: " + user + ", 配置: " + config);
});
CompletableFuture<Void> init1 = CompletableFuture.runAsync(() -> System.out.println("初始化1"));
CompletableFuture<Void> init2 = CompletableFuture.runAsync(() -> System.out.println("初始化2"));
init1.runAfterBoth(init2, () -> System.out.println("所有初始化完成"));
CompletableFuture<String> manualFuture = new CompletableFuture<>();
// 在其他地方手动完成
CompletableFuture.runAsync(() -> {
sleep(1000);
manualFuture.complete("手动完成的结果");
});
manualFuture.thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> " hello world ")
.thenApply(String::trim)
.thenApply(String::toUpperCase)
.thenApply(s -> s.replace(" ", "_"))
.thenAccept(System.out::println); // 输出: HELLO_WORLD
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "延迟结果";
});
// 立即获取,如果未完成返回默认值
String result = future.getNow("默认值");
System.out.println("当前结果: " + result);
CompletableFuture<String> retryableFuture = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.7) {
return "成功";
}
throw new RuntimeException("失败");
})
.exceptionally(ex -> {
System.out.println("第一次失败,重试...");
return CompletableFuture.supplyAsync(() -> "重试成功").join();
});
retryableFuture.thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> {
return 10 / 0; // 故意制造异常
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("处理异常: " + ex.getCause().getMessage());
return 0;
}
return result;
}).thenAccept(result -> System.out.println("最终结果: " + result));
CompletableFuture.supplyAsync(() -> Arrays.asList(1, 2, 3, 4, 5))
.thenApply(list -> list.stream().mapToInt(i -> i * 2).sum())
.thenApply(sum -> "总和: " + sum)
.thenAccept(System.out::println);
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
业务场景:
- 查询A接口耗时 300ms,B接口耗时 200ms,C接口耗时 300ms,D接口耗时 300ms,E接口耗时200ms
- 任务一需要 A+C,任务二需要 A+B,任务三需要 A+B+C
- 任务四需要任务一二三都完成后执行D接口
- 任务五需要任务三完成后执行E接口
- 要求总耗时不超过 800ms
public class AsyncTaskDemo {
private static final long START_TIME = System.currentTimeMillis();
private static final Executor CUSTOM_EXECUTOR = new ForkJoinPool(8);
public static void main(String[] args) {
System.out.println("=== CompletableFuture 异步任务优化示例 ===\n");
try {
// 关键优化1:校验接口并行执行
System.out.println("🚀 阶段1:并行启动校验接口");
CompletableFuture<String> futureA = callInterfaceA();
CompletableFuture<String> futureB = callInterfaceB();
CompletableFuture<String> futureC = callInterfaceC();
// 等待所有校验接口完成,如果任何一个失败则快速失败
CompletableFuture<Void> allValidations = CompletableFuture.allOf(futureA, futureB, futureC);
allValidations.get(); // 阻塞等待所有校验完成
// 关键优化2:智能依赖任务组合(只有校验通过才执行)
System.out.println("🚀 阶段2:组装依赖任务");
CompletableFuture<String> task1 = futureA.thenCombineAsync(futureC,
(a, c) -> executeTaskOne(a, c), CUSTOM_EXECUTOR);
CompletableFuture<String> task2 = futureA.thenCombineAsync(futureB,
(a, b) -> executeTaskTwo(a, b), CUSTOM_EXECUTOR);
CompletableFuture<String> task3 = CompletableFuture
.allOf(futureA, futureB, futureC)
.thenApplyAsync(v -> executeTaskThree(futureA.join(), futureB.join(), futureC.join()), CUSTOM_EXECUTOR);
// 关键优化3:最终任务并行执行
System.out.println("🚀 阶段3:最终任务并行执行");
// 任务四:等待任务1、2、3完成后调用接口D
CompletableFuture<String> task4 = CompletableFuture
.allOf(task1, task2, task3)
.thenComposeAsync(v ->
callInterfaceD().thenApplyAsync(d ->
executeTaskFour(d, task1.join(), task2.join(), task3.join()),
CUSTOM_EXECUTOR), CUSTOM_EXECUTOR);
// 任务五:等待任务3完成后调用接口E(与任务4并行)
CompletableFuture<String> task5 = task3
.thenComposeAsync(task3Result ->
callInterfaceE().thenApplyAsync(e ->
executeTaskFive(e, task3Result), CUSTOM_EXECUTOR), CUSTOM_EXECUTOR);
// 等待所有任务完成
CompletableFuture.allOf(task4, task5).get(800, TimeUnit.MILLISECONDS);
// 输出结果
long totalTime = getCurrentTime();
System.out.println("\n=== 执行结果 ===");
// System.out.println("任务四结果: " + task4.get());
// System.out.println("任务五结果: " + task5.get());
System.out.println("总耗时: " + totalTime + "ms (目标: <800ms)");
System.out.println("性能状态: " + (totalTime < 800 ? "✅ 达标" : "❌ 超时"));
} catch (Exception e) {
System.err.println("❌ 执行失败: " + e.getMessage());
} finally {
((ForkJoinPool) CUSTOM_EXECUTOR).shutdown();
}
}
/**
* 校验接口A - 耗时300ms
*/
private static CompletableFuture<String> callInterfaceA() {
return createAsyncInterfaceWithFailure("接口A", 300, "校验结果A", 0.1, "接口A校验失败:数据不符合要求");
}
/**
* 校验接口B - 耗时200ms
*/
private static CompletableFuture<String> callInterfaceB() {
return createAsyncInterfaceWithFailure("接口B", 200, "校验结果B", 0.1, "接口B校验失败:数据不符合要求");
}
/**
* 校验接口C - 耗时300ms (带30%失败概率)
*/
private static CompletableFuture<String> callInterfaceC() {
return createAsyncInterfaceWithFailure("接口C", 300, "校验结果C", 0.1, "接口C校验失败:数据不符合要求");
}
/**
* 业务接口D - 耗时300ms
*/
private static CompletableFuture<String> callInterfaceD() {
return createAsyncInterfaceWithFailure("接口D", 300, "结果D", 0.3, "接口D校验失败:数据不符合要求");
}
/**
* 业务接口E - 耗时200ms
*/
private static CompletableFuture<String> callInterfaceE() {
return createAsyncInterfaceWithFailure("接口E", 200, "结果E", 0.1, "接口E校验失败:数据不符合要求");
}
/**
* 执行任务一 - 需要接口A和C的校验结果
*/
private static String executeTaskOne(String resultA, String resultC) {
System.out.printf(" 🎯 执行任务一,输入: %s, %s - %dms%n", resultA, resultC, getCurrentTime());
return "任务一完成";
}
/**
* 执行任务二 - 需要接口A和B的校验结果
*/
private static String executeTaskTwo(String resultA, String resultB) {
System.out.printf(" 🎯 执行任务二,输入: %s, %s - %dms%n", resultA, resultB, getCurrentTime());
return "任务二完成";
}
/**
* 执行任务三 - 需要接口A、B、C的校验结果
*/
private static String executeTaskThree(String resultA, String resultB, String resultC) {
System.out.printf(" 🎯 执行任务三,输入: %s, %s, %s - %dms%n", resultA, resultB, resultC, getCurrentTime());
return "任务三完成";
}
/**
* 执行任务四 - 需要接口D的结果和任务一二三的结果
*/
private static String executeTaskFour(String resultD, String task1Result, String task2Result, String task3Result) {
System.out.printf(" 🎯 执行任务四,输入: %s, %s, %s, %s - %dms%n", resultD, task1Result, task2Result, task3Result, getCurrentTime());
return "任务四完成";
}
/**
* 执行任务五 - 需要接口E的结果和任务三的结果
*/
private static String executeTaskFive(String resultE, String task3Result) {
System.out.printf(" 🎯 执行任务五,输入: %s, %s - %dms%n", resultE, task3Result, getCurrentTime());
return "任务五完成";
}
private static long getCurrentTime() {
return System.currentTimeMillis() - START_TIME;
}
// ==================== 通用工具方法 ====================
/**
* 安全的线程休眠方法,统一处理中断异常
* @param milliseconds 休眠时间(毫秒)
* @param interfaceName 接口名称,用于异常信息
*/
private static void safeSleep(long milliseconds, String interfaceName) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(interfaceName + "被中断", e);
}
}
/**
* 创建可能失败的异步接口调用
* @param interfaceName 接口名称
* @param delayMs 延迟时间
* @param result 成功时的返回结果
* @param failureRate 失败概率 (0.0 - 1.0)
* @param failureMessage 失败时的异常信息
* @return CompletableFuture
*/
private static CompletableFuture<String> createAsyncInterfaceWithFailure(
String interfaceName,
long delayMs,
String result,
double failureRate,
String failureMessage) {
return CompletableFuture.supplyAsync(() -> {
safeSleep(delayMs, interfaceName);
// 模拟随机失败
if (Math.random() < failureRate) {
System.out.printf(" ❌ %s失败 - %dms%n", interfaceName, getCurrentTime());
throw new RuntimeException(failureMessage);
}
System.out.printf(" ✅ %s完成: %s - %dms%n", interfaceName, result, getCurrentTime());
return result;
}, CUSTOM_EXECUTOR);
}
}