Skip to content

并发编程-CompletableFuture

API

提交任务

supplyAsync

java
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync

java
// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 

// 自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

结果转换

thenRun/thenRunAsync

java
public CompletableFuture<Void> thenRun(Runnable action);

public CompletableFuture<Void> thenRunAsync(Runnable action);

public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenRun 和thenRunAsync有什么区别呢

java
   private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
        
    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

TIPS: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

thenApply/thenApplyAsync

java
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

thenAccept/thenAcceptAsync

java
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

thenAcceptBoth

java
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth这一组函数入参包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的接口,在JDK中只有一个实现类:CompletableFuture,所以第一个入参就是CompletableFuture,这一组函数是用来接受两个CompletableFuture的返回值,并将其组合到一起。BiConsumer这个函数式接口有两个入参,并且没有返回值,BiConsumer的第一个入参就是调用方CompletableFuture的执行结果,第二个入参就是thenAcceptBoth接口入参的CompletableFuture的执行结果。所以这一组函数意思是将两个CompletableFuture执行结果合并到一起。

handle

java
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;

并且handle方法返回的CompletableFuture的result是回调方法执行的结果。

同时如果有异常,需要手动处理异常。

thenCombine

java
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

thenCombine这一组函数和thenAcceptBoth类似,入参都包含一个CompletionStage,也就是CompletableFuture对象,意思也是组合两个CompletableFuture的执行结果,不同的是thenCombine的第二个入参为BiFunction,该函数式接口有两个入参,同时有一个返回值。所以与thenAcceptBoth不同的是,thenCombine将两个任务结果合并后会返回一个全新的值作为出参。

thenCompose

java
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。

该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
  • 如果该CompletableFuture实例为null,然后就执行这个新任务

回调方法

whenComplete

java
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值

并且whenComplete方法返回的CompletableFuture的result是上个任务的结果

异常处理

exceptionally

java
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally是用来处理异常的,当任务抛出异常后,可以通过exceptionally来进行处理,也可以选择使用handle来进行处理,不过两者有些不同,hand是用来处理上一个任务的结果,如果有异常情况,就处理异常。而exceptionally可以放在CompletableFuture处理的最后,作为兜底逻辑来处理未知异常。

获取结果

AllOf

java
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

所有任务都执行完成后,才执行 allOf返回的CompletableFuture。

如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

AnyOf

java
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

任意一个任务执行完,就执行anyOf返回的CompletableFuture。

如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

get/getNow/join

java
public T get() throws InterruptedException, ExecutionException

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

public T getNow(T valueIfAbsent)

public T join()

get方法一个是不带超时时间的,一个是带有超时时间的。

getNow方法则是立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。

join方法是不带超时时间的等待任务完成。

使用场景

创建异步任务

描述是否支持返回值
supplyAsync执行CompletableFuture任务支持
runAsync执行CompletableFuture任务不支持

任务异步回调

image-20240622170740957

多任务组合

image-20240622171457127

CompletableFuture作用区别
AND组合关系将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务
thenCombine将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
thenAcceptBoth将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
runAfterBoth不会把执行结果当做方法入参,且没有返回值。
OR 组合关系将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务
applyToEither会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
acceptEither会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
runAfterEither不会把执行结果当做方法入参,且没有返回值。
AnyOf
AllOf
thenCompose

使用注意事项

1. Future需要获取返回值,才能获取异常信息

java
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
                                                         TimeUnit.SECONDS, 
                                                         new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    int a = 0;
    int b = 666;
    int c = b / a;
    return true;
},executorService).thenAccept(System.out::println);

//如果不加 get()方法这一行,看不到异常信息
future.get();

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。使用的时候考虑是否

加try...catch...或者使用exceptionally方法。

2. CompletableFuture的get()方法是阻塞的。

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间

java
// 反例
 CompletableFuture.get();

// 正例
CompletableFuture.get(5, TimeUnit.SECONDS);

3. CompletableFuture默认线程池

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

当自定义线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。

因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离

测试用例

商品详情页

java

@Service
public class ItemServiceImpl implements ItemService {

    @Autowired
    private GmallPmsFeign pmsFeign;
    @Autowired
    private GmallSmsFeign smsFeign;
    @Autowired
    private GmallWmsFeign wmsFeign;
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    @Override
    public ItemVO loadData(Long skuId) throws ExecutionException, InterruptedException {
        ItemVO itemVO = new ItemVO();

        // 1. 获取sku的基本信息
        // 后续获取sku的促销信息、spu的销售属性和spu详情信息(需要sku中的spuId)都需要skuInfoEntity
        // supplyAsync有返回值
        // runAsync无返回值
        // 所以这里需要使用supplyAsync
        CompletableFuture<SkuInfoEntity> skuFuture = CompletableFuture.supplyAsync(() -> {
            Resp<SkuInfoEntity> skuInfoEntityResp = this.pmsFeign.querySkuById(skuId);
            SkuInfoEntity skuInfoEntity = skuInfoEntityResp.getData();
            if (skuInfoEntity != null) {
                BeanUtils.copyProperties(skuInfoEntity, itemVO);
            }
            return skuInfoEntity;
        }, threadPoolExecutor);

        // 2. 获取sku的图片信息
        CompletableFuture<Void> skuImageFuture = CompletableFuture.runAsync(() -> {
            Resp<List<SkuImagesEntity>> listResp = this.pmsFeign.queryImagesBySkuId(skuId);
            List<SkuImagesEntity> images = listResp.getData();
            if (!CollectionUtils.isEmpty(images)) {
                List<String> imageUrls = images.stream().map(image -> image.getImgUrl()).collect(Collectors.toList());
                itemVO.setPics(imageUrls);
            }
        }, threadPoolExecutor);

        // 3. 获取sku的促销信息 TODO

        // 4. 获取spu的所有销售属性
        // thenAcceptAsync:有参数,无返回
        // thenApplyAsync: 有参数,有返回
        // 后续spu详情也需要skuInfoEntity中的spuId,所以这里使用thenApplyAsync
        CompletableFuture<SkuInfoEntity> spuFuture = skuFuture.thenApplyAsync(skuInfoEntity -> {
            Resp<List<SkuSaleAttrValueEntity>> skuSaleAttrValueResp = this.pmsFeign.querySkuSaleAttrValueBySpuId(skuInfoEntity.getSpuId());
            List<SkuSaleAttrValueEntity> skuSaleAttrValueEntities = skuSaleAttrValueResp.getData();
            itemVO.setSaleAttrs(skuSaleAttrValueEntities);
            return skuInfoEntity;
        }, threadPoolExecutor);

        // 5. 获取规格参数组及组下的规格参数 TODO

        // 6. spu详情 TODO

        CompletableFuture<Void> future = CompletableFuture.allOf(skuFuture, skuImageFuture, spuFuture);
        // 阻塞主进程,等待子进程全部执行完毕!
        future.get();
        return itemVO;
    }
}
java
@Service
public class ItemServiceImpl implements ItemService {

    @Autowired
    private GmallPmsFeign pmsFeign;
    @Autowired
    private GmallSmsFeign smsFeign;
    @Autowired
    private GmallWmsFeign wmsFeign;
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    @Override
    public ItemVO loadData(Long skuId) throws ExecutionException, InterruptedException {
        ItemVO itemVO = new ItemVO();

        // 1. 获取sku的基本信息
        // 后续获取sku的促销信息、spu的销售属性和spu详情信息(需要sku中的spuId)都需要skuInfoEntity
        // supplyAsync有返回值
        // runAsync无返回值
        // 所以这里需要使用supplyAsync
        CompletableFuture<SkuInfoEntity> skuFuture = CompletableFuture.supplyAsync(() -> {
            Resp<SkuInfoEntity> skuInfoEntityResp = this.pmsFeign.querySkuById(skuId);
            SkuInfoEntity skuInfoEntity = skuInfoEntityResp.getData();
            if (skuInfoEntity != null) {
                BeanUtils.copyProperties(skuInfoEntity, itemVO);
            }
            return skuInfoEntity;
        }, threadPoolExecutor);

        // 2. 获取sku的图片信息
        CompletableFuture<Void> skuImageFuture = CompletableFuture.runAsync(() -> {
            Resp<List<SkuImagesEntity>> listResp = this.pmsFeign.queryImagesBySkuId(skuId);
            List<SkuImagesEntity> images = listResp.getData();
            if (!CollectionUtils.isEmpty(images)) {
                List<String> imageUrls = images.stream().map(image -> image.getImgUrl()).collect(Collectors.toList());
                itemVO.setPics(imageUrls);
            }
        }, threadPoolExecutor);

        // 3. 获取sku的促销信息 TODO

        // 4. 获取spu的所有销售属性
        // thenAcceptAsync:有参数,无返回
        // thenApplyAsync: 有参数,有返回
        // 后续spu详情也需要skuInfoEntity中的spuId,所以这里使用thenApplyAsync
        CompletableFuture<SkuInfoEntity> spuFuture = skuFuture.thenApplyAsync(skuInfoEntity -> {
            Resp<List<SkuSaleAttrValueEntity>> skuSaleAttrValueResp = this.pmsFeign.querySkuSaleAttrValueBySpuId(skuInfoEntity.getSpuId());
            List<SkuSaleAttrValueEntity> skuSaleAttrValueEntities = skuSaleAttrValueResp.getData();
            itemVO.setSaleAttrs(skuSaleAttrValueEntities);
            return skuInfoEntity;
        }, threadPoolExecutor);

        // 5. 获取规格参数组及组下的规格参数 TODO

        // 6. spu详情 TODO

        CompletableFuture<Void> future = CompletableFuture.allOf(skuFuture, skuImageFuture, spuFuture);
        // 阻塞主进程,等待子进程全部执行完毕!
        future.get();
        return itemVO;
    }
}
java
@Override // SkuInfoServiceImpl
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    SkuItemVo skuItemVo = new SkuItemVo();

    CompletableFuture<SkuInfoEntity> infoFutrue = CompletableFuture.supplyAsync(() -> {
        //1 sku基本信息
        SkuInfoEntity info = getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);

    CompletableFuture<Void> ImgageFuture = CompletableFuture.runAsync(() -> {
        //2 sku图片信息
        List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(images);
    }, executor);

    CompletableFuture<Void> saleAttrFuture =infoFutrue.thenAcceptAsync(res -> {
        //3 获取spu销售属性组合 list
        List<ItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBuSpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrVos);
    },executor);

    CompletableFuture<Void> descFuture = infoFutrue.thenAcceptAsync(res -> {
        //4 获取spu介绍
        SpuInfoDescEntity spuInfo = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesc(spuInfo);
    },executor);

    CompletableFuture<Void> baseAttrFuture = infoFutrue.thenAcceptAsync(res -> {
        //5 获取spu规格参数信息
        List<SpuItemAttrGroup> attrGroups = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
        skuItemVo.setGroupAttrs(attrGroups);
    }, executor);

    // 6.查询当前sku是否参与秒杀优惠
    CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
        R skuSeckillInfo = seckillFeignService.getSkuSeckillInfo(skuId);
        if (skuSeckillInfo.getCode() == 0) {
            SeckillInfoVo seckillInfoVo = skuSeckillInfo.getData(new TypeReference<SeckillInfoVo>() {});
            skuItemVo.setSeckillInfoVo(seckillInfoVo);
        }
    }, executor);

    // 等待所有任务都完成再返回
    CompletableFuture.allOf(ImgageFuture,saleAttrFuture,descFuture,baseAttrFuture,secKillFuture).get();
    return skuItemVo;
}

参考文档

Java8 CompletableFuture 用法全解

详解 java CompletableFuture

基础篇:异步编程不会?我教你啊!

编程老司机带你玩转 CompletableFuture 异步编程

CompletableFuture详解 |Java 开发实战

CompletableFuture实现异步编排全面分析和总结

JAVA基于CompletableFuture的流水线并行处理深度实践