CompletableFuture中方法的各种(多任务并发场景)使用案例----详解

x33g5p2x  于2021-12-18 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(444)

一、简单介绍

CompletableFuture是java8新增的并发工具类,继承了FutureTask的同步任务的特点,同时新增了异步调用的特点(其中异步的方法名称都带有Async),换而言之同步获取方法的返回值的方式可以用CompletableFuture完成,与此同时,想要异步获取方法的返回值也可以使用CompletableFuture来完成。

异步带Async,并且底层执行的线程由ForkJoinPool支持。于此同时还多了异常处理(执行任务的时候可能会发生异常,以前使用FutureTask的同步的方式是需要在执行的方法内处理异常的,而使用CompletableFuture后则可以对异常捕捉,并且修改返回值,受java8的函数式编程的特点)

二、优秀博客推荐

学习是一个不断改进的过程,看了大佬文章,然后自己跟踪源码过去后验证了文章中内容的正确性
掘金:CompletableFuture异步编程

重点知识:

CompletableFuture在执行异步任务的时候默认采用的是ForkJoinPoin线程池(前提是cpu是多核的,也就是Runtime.getRuntime().availableProcessors() - 1,例如cpu是12核心的,那么就是返回11)
如果想不使用ForkJoinPool线程池可以通过下面参数设置

1、参数的方式不启用默认ForkJoinPool连接池
-Djava.util.concurrent.ForkJoinPool.common.parallelism=1
2、CPU线程数<=2也是不启用默认的ForkJoinPool连接池

为了验证,我通过VMware修改CPU核心数,经过实验线程数<=2都是不启用默认连接池的,也就是说如果服务器是单核或者双核的情况并且没有超线程,那么线程数是<=2的也就是说不会启用默认的连接池,而是采用直接创建线程并启动起来。
那么就会使USE_COMMON_POOL的值变为false,表示不使用默认的公共池
源码是这样的,通过判断上面这个值是否大于1来判断是否启用默认ForkJoinPool

如果不使用默认的公共池,则默认采用new Thread(任务).start();方式执行任务。

要提出的问题一、我为什么要修改cpu核心数进行实验呢?

因为在我debug过程中我发现ForkJoinPool.mode的值是11,而我的电脑上的实际线程数是12,也就是Runtime.getRuntime().availableProcessors()返回的值是12。相当于进行了-1。而是否启用默认池则是通过ForkJoinPool.getCommonPoolParallelism() > 1来判断的,那么2个核心数的服务器-1后,也是不大于1的所以应该也是不启用默认ForkJoinPool池,所以进行了验证证明了我的结论。

要提出的问题二、为什么要这样设计?

博主我在看jdk源码的过程中发现,以及在学习 ForkJoinPool 的过程中了解到,这个是由DougLea大神的研究得出来得结论,也是经过实验过的结论,这篇关于ForkJoin框架的文章应该能在官网中能找到,另外ForkJoinPool的注释中也有说明,意思就是说ForkJoin采用先拆分后合并的思想,让一个大任务能快速的能计算完成。点击上面的ForkJoinPool可以链接到我之前做过的一次实验。如果是单核的情况下就不存在多任务并行执行,很明显采用ForkJoin反而大大降低了执行效率,因为要不断的切换cpu并且拆分也需要时间。

如何使用ForkJoin框架实际上是需要根据实际情况进行调整得,例如拆分任务的颗粒度,cpu核心线程等,只有在拆分所消耗的时间小于多线程并行计算所消耗的时间才使用ForkJoin,用的不好的话反而会使程序效率更低

三、CompletableFuture的整体结构和流程

CompletableFuture不需要new得到,起点是由CompletableFuture的public static方法,当我们调用这些方法时底层就会自动创建一个CompletableFuture实例,我们只需要把任务以及自定义的线程传入即可。所以一般的流程是

调用下面这些静态方法,然后进行链式调用

其它的一些public非静态方法则是后续处理,例如一个任务可能失败,可能有返回值,可能需要和其它任务组合完成。这里就不贴了,自己看源码

四、实际的多任务并发组合场景

如果想要写好高并发程序,利用CompletableFuture可以快速解决一些开发中常见的并发场景

CompletableFuture可以进行异步调用,类似于js中的Promise对象

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
    //返回一个数据
    return 1;
},executorService).whenComplete((res, throwable) -> {
    //完成异步时获取返回值
    System.out.println("返回值是:"+res+",异常是:"+throwable);
}).exceptionally((throwable) -> {
    //接收异常,修改返回值
    System.out.println("获得到的异常是:"+throwable);
    return 10;//返回一个值替代原来的返回值
});
CompletableFuture的具体使用: 创建线程的方式四
handle方法感知返回值,处理异常,并修改返回值

上面的方式通过whenComplete方法感知结果的产生,并且能接收异常,但是不能修改返回值,如果要修改返回值,就得和上面的案例一样结合exceptionally方法才能达到效果。

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
    //返回一个数据
    return 1;
},executorService).handle((res,throwable)->{
    if (res==1){
        System.out.println("返回的结果是"+res);
        return 1;
    }
    if (throwable!=null){
        System.out.println("异常是"+throwable);
        return 0;
    }
    return 100;
});

多任务组合场景

情景一、异步组合两任务都完成,执行任务三

runAfterBoth方法无法感知任务一任务二是否完成
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
},executorService);
future1.runAfterBothAsync(future2,()->{
    System.out.println("联合任务1和2");
},executorService);

使用thenAcceptBothAsync会感知任务一和任务二都完成。

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
},executorService);
future1.thenAcceptBothAsync(future2,(result1,result2)->{
    System.out.println("感知任务一和任务二完成,执行任务三");
},executorService);

合并多个任务使用thenCombineAsync在上面的结果,基础上,如果想要有返回值则使用它

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
future1.thenCombineAsync(future2,(res1,res2)->{
    System.out.println("感知任务完成,合并两个任务,返回一个新的结果");
    return res1+","+res2;
},executorService);

情景二、异步两任务只要有一个完成,就执行任务三

两个任务只要有一个完成就执行任务三的方法都是 XXXEither都带有Either

runAfterEitherAsync:不感知结果,并且无返回值
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    try {
        TimeUnit.MILLISECONDS.sleep(200);//让任务二晚点执行完,等任务二完成,任务三就会开始
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
future1.runAfterEitherAsync(future2,()->{
    System.out.println("任务三开始,任务一任务二只要有一个完成就会开始执行任务三");
},executorService);
acceptEitherAsync:感知结果,接收返回值并消费掉,不产生返回值
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
future1.acceptEitherAsync(future2,(res)->{
    System.out.println("感知结果执行任务三");//需要注意使用lambda表达式需要future1和future2返回值类型相同
},executorService);
applyToEitherAsync:感知返回值,转换返回值得到一个新的结果
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
future1.applyToEitherAsync(future2,(res)->{
    System.out.println("感知结果执行任务三");//需要注意使用lambda表达式需要future1和future2返回值类型相同
    return "新的返回值"+res*2;
},executorService);

情景三、多任务组合完成

allOf方法介绍,多任务都完成

如果使用下面的代码等待多任务完成。

问题一:是执行future1.get();时主线程是堵塞的,因此future2.get();future3.get();都不会执行需要等待future1先得到返回值,也就是说会加上多余的堵塞时间。虽然任务是异步的(的确任务已经完成了,但是线程没有交还线程池)

而我们希望的是对于提前完成任务的线程将他交还给线程池,让它可以再次被其它任务执行

例如:future1 耗时4s,future2耗时3秒,future3耗时5s。如果是直接下面代码,则会导致future2在3秒的时候就已经可以完成任务了,但是由于没有进行get,线程一直在等待返回数据,那么future2的线程相当于被占用着。

问题二:就是产生冗余代码
future1.get();
future2.get();
future3.get();
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务三开始");
    return 3;
}, executorService);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
try {
    allOf.get();//等待所有任务都完成
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

如果要获得每一个任务的返回结果还是需要使用future1.get();,future2.get(),future3.get()得到返回结果。

情景四、多任务组合只要有一个完成

anyOf
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务三开始");
    return 3;
}, executorService);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
try {
    anyOf.get();//获得完成的那个任务结果,其它任务的结果就获取不到,想要获取得调用各自得get
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

相关文章