首页>>后端>>java->再谈CompletableFuture之循环创建并发线程

再谈CompletableFuture之循环创建并发线程

时间:2023-11-29 本站 点击:17

前言

在之前一篇文章《利用CompletableFuture做多线程并发操作》里,介绍了如何使用CompletableFuture进行多线程并发操作,但是限定了并发的子线程个数为一个确定值,在代码层面就固定了。当并发的子线程数量不固定时,那么,之前的用法就无法继续使用,此时需要换一个用法。

循环创建并发线程

基本思路

基本思路是:将所有的子线程任务通过循环的方式放入到一个List<CompletableFuture>里,根据业务的场景,选择不同的方法:

所有子线程都需要完成后再执行主线程

CompletableFuture.allOf().join()

其中任何一个子线程完成后就执行主线程

ComPletableFuture.anyOf()

上代码

业务场景:根据上传的多个行政区编码(adCode)并发查询天气信息。

因为qWeatherByCode()方法有返回值,所以需要使用CompletableFuture.supplyAsync()方法。

该方法返回一个CompletableFuture对象,然后加入到List<CompletableFuture>对象里。

然后使用CompletableFuture.allOf().join()方法,当调用该方法时,主线程会一直阻塞,直到List<CompletableFuture>里的子线程均已完成(或者超时)。

List<CompletableFuture>futures=newArrayList();for(StringadCode:adCodeList){futures.add(CompletableFuture.supplyAsync(()->qWeatherByCode(adCode)));}CompletableFuture.allOf(futures.toArray(newCompletableFuture[futures.size()])).join();

需要注意的是,上面的代码里CompletableFuture.supplyAsync(()->qWeatherByCode(adCode)),没有指定Executor,所以使用默认的线程池ForkJoinPool.commonPool()

ForkJoinPool.commonPool()是一个共享线程池(基于服务器内核的限制,如果CPU是八核,每次线程只能起八个,不能自定义线程池),如果使用不当,会对性能造成严重的影响。所以一般建议这里使用自定义的Executor:

List<CompletableFuture>futures=newArrayList();for(StringadCode:adCodeList){futures.add(CompletableFuture.supplyAsync(()->qWeatherByCode(adCode),asyncExecutor()));}CompletableFuture.allOf(futures.toArray(newCompletableFuture[futures.size()])).join();

asyncExecutor():

@Bean("asyncExcutor")publicExecutorasyncExecutor(){log.info("startasyncexecutor");ThreadPoolTaskExecutorthreadPoolTaskExecutor=newThreadPoolTaskExecutor();//配置核心线程数threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);//配置最大线程数threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);//配置队列大小threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);//配置线程池中线程的名称前缀threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);//HelloWorldServiceImplrejection-policy:当pool已经达到maxsize时,如何处理新任务://CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行;//AbortPolicy:拒绝执行新任务,并抛出RejectedExecutionException异常;//DiscardPolicy:丢弃当前将要加入队列的任务;//DiscardOldestPolicy:丢弃任务队列中最旧的任务;threadPoolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.initialize();returnthreadPoolTaskExecutor;}

CompletableFuture的常用场景

packagecom.example.demo;importorg.junit.Test;importjava.util.Arrays;importjava.util.List;importjava.util.Random;importjava.util.concurrent.*;/***Project<demo-project>*Createdbyjorgezhongon2018/9/811:45.*/publicclassCompletableFutureDemo{/***创建CompletableFuture*-runAsync*-supplyAsync*-completedFuture*<p>*异步计算启用的线程池是守护线程*/@Testpublicvoidtest1(){//1、异步计算:无返回值//默认线程池为:ForkJoinPool.commonPool()CompletableFuture.runAsync(()->{//TODO:2018/9/8无返回异步计算System.out.println(Thread.currentThread().isDaemon());});//指定线程池,(到了jdk9CompletableFuture还拓展了延迟的线程池)CompletableFuture.runAsync(()->{//TODO:2018/9/8无返回异步计算},Executors.newFixedThreadPool(2));//2、异步计算:有返回值//使用默认线程池CompletableFuture<String>future1=CompletableFuture.supplyAsync(()->"result1");//getNow指定异步计算抛出异常或结果返回null时替代的的值Stringresult1=future1.getNow(null);//指定线程池CompletableFuture<String>future2=CompletableFuture.supplyAsync(()->"result2",Executors.newFixedThreadPool(2));//getNow指定异步计算抛出异常或结果返回null时替代的的值Stringresult2=future2.getNow(null);//3、初始化一个有结果无计算的CompletableFutureCompletableFuture<String>future=CompletableFuture.completedFuture("result");Stringnow=future.getNow(null);System.out.println("now="+now);}/***计算完成时需要对异常进行处理或者对结果进行处理*-whenComplete:同步处理包括异常*-thenApply:同步处理正常结果(前提是没有异常)*<p>*-whenCompleteAsync:异步处理包括异常*-thenApplyAsync:异步处理正常结果(前提是没有异常)*<p>*-exceptionally:处理异常*/@Testpublicvoidtest2(){CompletableFuture<String>future=CompletableFuture.supplyAsync(()->"result");//whenComplete方法收future的结果和异常,可灵活进行处理//1、同步处理//无返回值:可处理异常future.whenComplete((result,throwable)->System.out.println("result="+result));//有返回值:没有异常处理(前提)CompletableFuture<String>resultFuture1=future.thenApply(result->"result");Stringresult1=resultFuture1.getNow(null);//2、异步处理://无返回值:默认线程池future.whenCompleteAsync((result,throwable)->System.out.println("result="+result));//无返回值:指定线程池future.whenCompleteAsync((result,throwable)->System.out.println("result="+result),Executors.newFixedThreadPool(2));//有返回值:默认线程池CompletableFuture<String>resultFuture2=future.thenApplyAsync(result->"result");Stringresult2=resultFuture2.getNow(null);//有返回值:指定线程池CompletableFuture<String>resultFuture3=future.thenApplyAsync(result->"result",Executors.newFixedThreadPool(2));Stringresult3=resultFuture3.getNow(null);//3、处理异常,处理完之后返回一个结果CompletableFuture<String>exceptionallyFuture=future.whenCompleteAsync((result,throwable)->System.out.println("result="+1/0)).exceptionally(throwable->"发生异常了:"+throwable.getMessage());System.out.println(exceptionallyFuture.getNow(null));}/***异常处理还可以使用以下两个方法*-handle*-handleAsync*<p>*备注:exceptionally同步和异步计算一起用如果出现异常会把异常抛出。用以上的方法可以拦截处理*/@Testpublicvoidtest3(){CompletableFuture<String>exceptionoHandle=CompletableFuture.completedFuture("producemsg").thenApplyAsync(s->"result"+1/0);StringhandleResult1=exceptionoHandle.handle((s,throwable)->{if(throwable!=null){returnthrowable.getMessage();}returns;}).getNow(null);//指定线程池StringhandleResult2=exceptionoHandle.handleAsync((s,throwable)->{if(throwable!=null){returnthrowable.getMessage();}returns;},Executors.newFixedThreadPool(2)).getNow(null);}/***生产--消费*-thenAccept:同步的*-thenAcceptAsync:异步的*<p>*接受上一个处理结果,并实现一个Consumer,消费结果*/@Testpublicvoidtest4(){//同步的CompletableFuture.completedFuture("producemsg").thenAccept(s->System.out.println("syncconsumedmsg:"+s));//异步的//默认线程池CompletableFuture.completedFuture("producemsg").thenAcceptAsync(s->System.out.println("asyncconsumedmsg:"+s));//指定线程池CompletableFuture.completedFuture("producemsg").thenAcceptAsync(s->System.out.println("asyncconsumedmsg:"+s),Executors.newFixedThreadPool(2));}/***取消任务*-cancel*/@Testpublicvoidtest5()throwsInterruptedException{CompletableFuture<String>message=CompletableFuture.completedFuture("message").thenApplyAsync(s->{try{Thread.sleep(800);}catch(InterruptedExceptione){e.printStackTrace();}returns+"result";});Stringnow=message.getNow(null);System.out.println("now="+now);//取消booleancancel=message.cancel(true);System.out.println("cancel="+cancel);//如果这里再去获取,会抛出异常,说明已经取消了//Stringnow1=message.getNow(null);Thread.sleep(1000);}/***两个异步计算*-applyToEither:有返回值,同步*-acceptEither:无返回值,同步*-applyToEitherAsync:有返回值,异步*-*/@Testpublicvoidtest6(){CompletableFuture<String>task1=CompletableFuture.completedFuture("task1").thenApply(s->"task1的计算结果:s1="+s);//同步,有返回值//applyToEither第二个参数接收的值是task1计算的返回值CompletableFuture<String>result1=task1.applyToEither(CompletableFuture.completedFuture("task2").thenApply(s->"task2的计算结果:s2="+s),s->s);System.out.println("task2:"+result1.getNow(null));//同步,无返回值task1.acceptEither(CompletableFuture.completedFuture("task3").thenApply(s->"task3的计算结果:s3="+s),s->System.out.println("task3:"+s));//异步有返回值,默认线程池,也可以指定CompletableFuture<String>result2=task1.applyToEitherAsync(CompletableFuture.completedFuture("task4").thenApply(s->"task4的计算结果:s4="+s),s->s);//由于是异步的,主线程跑的快一点,因此join()之后才能看到跑完的结果System.out.println("task4:"+result2.join());//异步无返回值,指定线程池,也可以使用默认线程池CompletableFuture<Void>task5=task1.acceptEitherAsync(CompletableFuture.completedFuture("task5").thenApply(s->"task5的计算结果:s5="+s),s->System.out.println("task5:"+s),Executors.newFixedThreadPool(2));task5.join();}/***组合计算结果*-runAfterBoth:都计算完之后执行一段代码*-thenAcceptBoth:都计算完之后把结果传入,并执行一段代码*<p>*-thenCombine:组合两个结果*-thenCompose:组合两个结果*/@Testpublicvoidtest7(){//runAfterBoth方式StringBuildermsg=newStringBuilder("jorgeZhong");CompletableFuture.completedFuture(msg).thenApply(s->s.append("task1,")).runAfterBoth(CompletableFuture.completedFuture(msg).thenApply(s->s.append("task2")),()->System.out.println(msg));//thenAcceptBoth方式CompletableFuture.completedFuture("jorgeZhong").thenApplyAsync(String::toLowerCase).thenAcceptBoth(CompletableFuture.completedFuture("jorgeZhong").thenApplyAsync(String::toUpperCase),(s,s2)->System.out.println("s1:"+s+",s2:"+s2));//thenCombine方式CompletableFuture<String>result1=CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toLowerCase).thenCombine(CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toUpperCase),(s,s2)->"s1:"+s+",s2:"+s2);System.out.println("result1:"+result1.getNow(null));//异步CompletableFuture<String>result11=CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toLowerCase).thenCombineAsync(CompletableFuture.completedFuture("jorgeZhong").thenApplyAsync(String::toUpperCase),(s,s2)->"s1:"+s+",s2:"+s2);System.out.println("result11:"+result11.join());//thenCompose方式CompletableFuture<String>result2=CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toLowerCase).thenCompose(s->CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toUpperCase).thenApply(s1->"s:"+s+",s1:"+s1));System.out.println("result2:"+result2.getNow(null));//异步CompletableFuture<String>result22=CompletableFuture.completedFuture("jorgeZhong").thenApply(String::toLowerCase).thenComposeAsync(s->CompletableFuture.completedFuture("jorgeZhong").thenApplyAsync(String::toUpperCase).thenApplyAsync(s1->"s:"+s+",s1:"+s1));System.out.println("result22:"+result22.join());}/***多个CompletableFuture策略*-anyOf:接受一个CompletableFuture数组,任意一个任务执行完返回。都会触发该CompletableFuture*-whenComplete:计算执行完之后执行实现的一段代码,将上一个结果和异常作为参数传入*/@Testpublicvoidtest8()throwsInterruptedException{List<String>messages=Arrays.asList("a","b","c");CompletableFuture.anyOf(messages.stream().map(o->CompletableFuture.completedFuture(o).thenApplyAsync(s->{try{Thread.sleep(newRandom().ints(99,300).findFirst().getAsInt());}catch(InterruptedExceptione){e.printStackTrace();}returns.toUpperCase();})).toArray(CompletableFuture[]::new)).whenComplete((res,throwable)->{if(throwable==null){System.out.println(res.toString());}});Thread.sleep(1000);}/***多个CompletableFuture策略*-allOf:接受一个CompletableFuture数组,所有任务返回后,创建一个CompletableFuture*/@Testpublicvoidtest9(){List<String>messages=Arrays.asList("a","b","c");CompletableFuture[]cfs=messages.stream().map(s->CompletableFuture.completedFuture(s).thenApplyAsync(String::toUpperCase)).toArray(CompletableFuture[]::new);CompletableFuture.allOf(cfs).whenCompleteAsync((aVoid,throwable)->Arrays.stream(cfs).forEach(completableFuture->System.out.println(completableFuture.getNow(null))));}}

参考文章

jdk8特性-CompletableFuture

CompletableFuture的执行线程

作者:arkMon


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/83.html