SpringBoot2.x整合线程池(ThreadPoolTaskExecutor) 您所在的位置:网站首页 springboot调度器池详解 SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)

SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)

2023-08-05 12:25| 来源: 网络整理| 查看: 265

JAVA && Spring && SpringBoot2.x — 学习目录

我们在JDK中,可以使用ThreadPoolExecutor提供线程池服务,相关理论,可以在多线程——线程池ThreadPoolExecutor了解。但是SpringBoot提供了@Async [鹅神可]注解,帮助我们更方便的将业务逻辑提交到线程池中异步处理。

1. SpringBoot对线程池的自动装载

源代码:org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

@Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder() { TaskExecutionProperties.Pool pool = this.properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix()); builder = builder.customizers(this.taskExecutorCustomizers); builder = builder.taskDecorator(this.taskDecorator.getIfUnique()); return builder; } @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }

我们可以在配置文件中配置连接池的相关参数。

2. 自定义线程池 2.1 根据业务配置不同的线程池

我们不推荐一个项目配置一个线程池,这样若是某些业务出现异常时,会影响到整个项目的健壮性。故我们可以根据业务,为不同的业务配置不同参数的数据库连接池。

@Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Bean public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor(); //核心线程数 threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true); //最大线程数 threadPoolTaskExecutor.setMaxPoolSize(5); //配置队列大小 threadPoolTaskExecutor.setQueueCapacity(50); //配置线程池前缀 threadPoolTaskExecutor.setThreadNamePrefix("async-service-"); //拒绝策略 // threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy()); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } @Bean public Executor customServiceExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor(); //线程核心数目 threadPoolTaskExecutor.setCorePoolSize(10); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true); //最大线程数 threadPoolTaskExecutor.setMaxPoolSize(10); //配置队列大小 threadPoolTaskExecutor.setQueueCapacity(50); //配置线程池前缀 threadPoolTaskExecutor.setThreadNamePrefix("custom-service-"); //配置拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //数据初始化 threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } }

若是想在使用连接池的时候,打印出连接池的各项参数,应当如何设置:

@Slf4j public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { //打印队列的详细信息 private void showThreadPoolInfo(String prefix){ ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if(null==threadPoolExecutor){ return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public Future submit(Callable task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public ListenableFuture submitListenable(Callable task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } } 2.2 如何使用连接池

在业务方法中使用@Async注解,并且可以选择使用的连接池。来启动一个异步任务。

若是想获取到任务返回值,可创建Callable任务 //带返回值的任务 @Async("asyncServiceExecutor") public Future doTask1() throws InterruptedException{ log.info("Task1 started."); long start = System.currentTimeMillis(); Thread.sleep(5000); long end = System.currentTimeMillis(); log.info("Task1 finished, time elapsed: {} ms.", end-start); return new AsyncResult("Task1 accomplished!"); } @Async("customServiceExecutor") public Future doTask2() throws InterruptedException{ log.info("Task2 started."); long start = System.currentTimeMillis(); Thread.sleep(3000); long end = System.currentTimeMillis(); log.info("Task2 finished, time elapsed: {} ms.", end-start); return new AsyncResult("Task2 accomplished!"); } 若是创建的Runnable的异步任务 //创建的是Runnable的任务 @Async("asyncServiceExecutor") public void executeAsync() { log.info("start executeAsync"); try{ Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } log.info("end executeAsync"); } 2.3 如何获取任务的返回值

若是我们使用线程池,来并发的执行任务,首先需要考虑的是,如何等待最后一个任务执行完毕,对任务结果进行汇总处理。

方法一:使用自旋操作,等待任务结果返回。

@RequestMapping("/helloFuture") @ResponseBody public String helloFuture() { try { Future future1 = serviceImpl.doTask1(); Future future2 = serviceImpl.doTask2(); //自旋锁,停止等待 while (true) { if (future1.isDone() && future2.isDone()) { log.info("Task1 result:{}", future1.get()); log.info("Task2 result:{}", future2.get()); break; } Thread.sleep(1000); } log.info("All tasks finished."); return "S"; } catch (InterruptedException e) { log.error("错误信息1", e); return "F"; } catch (ExecutionException e) { log.error("错误信息2", e); return "F"; } }

方法二:使用CountDownLatch计数器

相关理论可以参考:多线程——CountDownLatch详解

@RequestMapping("/helloFuture2") @ResponseBody public String helloFuture2() { try { CountDownLatch latch=new CountDownLatch(2); Future future1 = serviceImpl.doTask1(latch); Future future2 = serviceImpl.doTask2(latch); //等待两个线程执行完毕 latch.await(); log.info("All tasks finished!"); String result1 = future1.get(); String result2 = future2.get(); log.info(result1+"--"+result2); return "S"; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return "F"; }

每个任务执行完毕,只需要调用latch.countDown();使得计数器-1。

//带返回值的任务 @Async("asyncServiceExecutor") public Future doTask1(CountDownLatch latch) throws InterruptedException{ log.info("Task1 started."); long start = System.currentTimeMillis(); Thread.sleep(5000); long end = System.currentTimeMillis(); log.info("Task1 finished, time elapsed: {} ms.", end-start); latch.countDown(); return new AsyncResult("Task1 accomplished!"); } @Async("customServiceExecutor") public Future doTask2(CountDownLatch latch) throws InterruptedException{ log.info("Task2 started."); long start = System.currentTimeMillis(); Thread.sleep(3000); long end = System.currentTimeMillis(); log.info("Task2 finished, time elapsed: {} ms.", end-start); latch.countDown(); return new AsyncResult("Task2 accomplished!"); }

方式三:使用Future的get方法的阻塞特性

@RequestMapping("/helloFuture2") @ResponseBody public String helloFuture2() { try { List tasks = new ArrayList(); List results = new ArrayList(); tasks.add(serviceImpl.doTask1()); tasks.add(serviceImpl.doTask2()); //各个任务执行完毕 for (Future task : tasks) { //每个任务都会再在此阻塞。 results.add(task.get()); } log.info("All tasks finished!"); log.info("执行结果:{}", JSON.toJSONString(results)); return "S"; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return "F"; } 2.4 Runnable异常处理

该配置可与线程池配置在一起,若异步线程抛出异常,会由该类打印。

@Configuration public class ExecutorConfig implements AsyncConfigurer { //配置异常处理机制 @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex,method,params)->{ log.error("异步线程执行失败。方法:[{}],异常信息[{}] : ", method, ex.getMessage(),ex); }; } }

效果图:

2019-12-25 19:14:09.851 ERROR [] --- [async-service-1] c.g.Config.threadPool.ExecutorConfig : 异步线程执行失败。方法:[public void com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(java.lang.String,java.lang.String)],异常信息[/ by zero] : java.lang.ArithmeticException: / by zero at com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(AccountServiceImpl.java:308) at com.galax.bussiness.account.impl.AccountServiceImpl$$FastClassBySpringCGLIB$$4e0db2a2.invoke() at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 彩蛋——Future使用lambda表达式 public void sendMail(Map model, String title, String templateName, String toMail, String[] ccMail, long timeout) throws Exception { Future submit; submit = emailServiceExecutor.submit(() ->{ try { return "s"; } catch (Exception e) { return "F"; } }); } 彩蛋——若自定义实现线程池,如何获取到各个任务的结果

若是我们自己实现线程池,可以使用java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有