Executors
创建线程池
Executors.newFixedThreadPool(10);
通过 Executors 可以创建多种类型的线程池
但是这种创建线程池的方式是不推荐的,其代码实际上还是创建了一个 ThreadPoolExecutor 线程池,并且指定该线程池的等待队列大小为 Integer.MAX_VALUE,如果队列中的任务堆积过多,可能会导致内存溢出或者应用程序崩溃。

ThreadPoolTaskExecutor
通过在配置类中注册 bean 的方式创建线程池,并交由 spring Ioc 容器管理
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数(默认线程数)
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 等待队列大小
executor.setQueueCapacity(20);
// 允许线程空闲时间(单位:默认为秒)
executor.setKeepAliveSeconds(10);
// 线程池名前缀
executor.setThreadNamePrefix("Async-Service-");
// 线程池对拒绝任务的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 初始化
executor.initialize();
return executor;
}
通过自定义线程池的方式创建线程池,需要手动设置好线程池的参数以及拒绝策略
- 核心线程数:代表了线程池中保持运行的线程数量。当有新任务提交时,如果当前线程数小于核心线程数,线程池会创建新的线程来执行任务。这些核心线程在创建后会保持活动状态,即使它们当前没有任务执行,也不会被销毁,以应对可能的未来任务。
- 最大线程数量:指线程池中允许创建的最大线程数。当任务数量超过核心线程数并且任务队列已满时,线程池会创建非核心线程来处理任务,直到达到最大线程数。当任务执行完成且任务队列为空的情况下,这部分线程会被销毁。
- 等待队列大小:指线程池所包含的任务队列的最大容量。当任务数量超过了线程池的核心线程数和最大线程数,且任务队列也满了之后,新增的任务会根据线程池的拒绝策略进行处理
- 拒绝策略:
- AbortPolicy(默认策略):当任务无法被接受时,直接抛出一个RejectedExecutionException异常
- CallerRunsPolicy:当有新任务提交时,如果线程池没有被关闭且没有能力执行,则把这个任务交于提交任务的线程执行。这意味着如果主线程提交了一个任务,而线程池已经饱和,那么这个任务将由主线程自己执行。这种策略提供了一种反馈机制,能够减缓新任务的提交速度。
- DiscardPolicy:新任务被提交后直接被丢弃掉,并且不会抛出异常。
- DiscardOldestPolicy:丢弃任务队列中的头结点(通常是存活时间最长且未被处理的任务),然后将新的任务添加到队列中。如果队列是一个优先队列,那么丢弃的将是优先级最高的任务。

存在问题
自定义线程池可以手动配置最大线程数量和等待队列大小,可降低内存溢出的风险,但会存在拒绝任务,导致任务丢失的情况,例如
最终执行的结果如下,使用 DiscardPolicy 拒绝策略会丢失大量任务
CompletableFuture
使用示例
@Resource(name = "autoSignAsyncExecutor")
private ThreadPoolTaskExecutor commonAsyncServiceExecutor;
//创建异步方法数组
CompletableFuture[] futures = fileList.stream().map(vo -> CompletableFuture.supplyAsync(() -> {
log.info("vo:{ } ",JsonUtil.convertObj2String(vo));
ProvincialWorkorderDocEntity workorderDocEntity = excuteSealSign(vo,reqVo, reqVo.getStaffUid());
log.info("workorderDocEntity:{ } ",JsonUtil.convertObj2String(workorderDocEntity));
return workorderDocEntity;
} , commonAsyncServiceExecutor)).toArray(size -> new CompletableFuture[size]);
//allOf()为等待所有子任务线程全部执行完毕后返回
CompletableFuture.allOf(futures).join();
for (CompletableFuture f : futures) {
//join()获取子线程运算的结果,不会抛出异常
Object obj = f.join();
if (obj!=null) {
ProvincialWorkorderDocEntity entity = (ProvincialWorkorderDocEntity) obj;
log.info("赋值结果{ } ",JsonUtil.convertObj2String(entity));
result.add(entity);
}
}
谢谢光临~
- 本文链接:https://lxjblog.gitee.io/2024/04/10/ThreadPool/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。