大家好,我是阿晶,今天我们来介绍一下在 Java 项目中线程池的使用,线程池的使用也有正确的姿势,有了正确的姿势后,可以让你的项目更加的持久稳定。
Spring中提供了对ThreadPoolExecutor
封装的线程池ThreadPoolTaskExecutor
。
使用步骤
配置类
先创建线程池的配置,让SpringBoot将其加载进来。
使用@Configuration
和@EnableAsync
这两个注解,表示这是个配置类,并且是线程池的配置类
package com.itjing.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author lijing
* @date 2022年05月30日 19:37
* @description 线程池配置类
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(corePoolSize);
// 配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 配置队列大小
executor.setQueueCapacity(queueCapacity);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
配置文件
# 异步线程配置
async:
executor:
thread:
# 配置核心线程数
core_pool_size: 10
# 配置最大线程数
max_pool_size: 10
# 配置队列大小
queue_capacity: 1000
# 配置线程池中的线程的名称前缀
name:
prefix: async-service-
业务类
创建一个Service接口,是异步线程的接口
package com.itjing.service;
/**
* @author lijing
* @date 2022年05月30日 19:50
* @description
*/
public interface AsyncService {
/**
* 执行异步任务
* 可以根据需求,自己加参数拟定,我这里就做个测试演示
*/
void executeAsync();
}
实现类
package com.itjing.service.impl;
import com.itjing.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* @author lijing
* @date 2022年05月30日 19:53
* @description
*/
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
// 异步线程要做的事情 TODO
// 可以在这里执行操作比较耗时的事情
log.info("do something......");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("end executeAsync");
}
}
将Service层的方法异步化,在executeAsync()
方法上增加注解@Async("asyncServiceExecutor")
,asyncServiceExecutor
方法是前面ExecutorConfig.java
中的方法名,表明executeAsync
方法进入的线程池是asyncServiceExecutor
方法创建的。
控制器
接下来就是在Controller里或者是哪里通过注解@Autowired
注入这个Service
package com.itjing.controller;
import com.itjing.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author lijing
* @date 2022年05月30日 19:59
* @description
*/
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public void async() {
asyncService.executeAsync();
}
}
测试
用ApiPost或者其他工具来多次测试请求一下
2022-05-30 20:02:52.656 INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : start executeAsync
2022-05-30 20:02:52.656 INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : do something......
2022-05-30 20:02:52.656 INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : end executeAsync
2022-05-30 20:03:46.919 INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : start executeAsync
2022-05-30 20:03:46.919 INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : do something......
2022-05-30 20:03:46.919 INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : end executeAsync
2022-05-30 20:03:47.412 INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl : start executeAsync
2022-05-30 20:03:47.412 INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl : do something......
2022-05-30 20:03:47.412 INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl : end executeAsync
通过以上的日志可以发现,[async-service-]有多个线程,显然已在配置的线程池中执行了。
虽然已经用上了线程池,但是我们并不清楚线程池当时情况,比如有多少线程在执行,有多少线程在队列等待呢?
重写线程池
这里创建了一个ThreadPoolTaskExecutor
的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来。
package com.itjing.config.executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author lijing
* @date 2022年05月30日 20:10
* @description 创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来
*/
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (Objects.isNull(threadPoolExecutor)) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
getThreadNamePrefix() + System.currentTimeMillis(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size()
);
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("do execute Runnable");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("do execute Runnable and StartTimeout");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("do submit Runnable");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("do submit Callable");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("do submitListenable Runnable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("do submitListenable Callable");
return super.submitListenable(task);
}
}
如上所示,showThreadPoolInfo
方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的execute
、submit
等方法,在里面调用showThreadPoolInfo
方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;
修改 ExecutorConfig.java
的asyncServiceExecutor
方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()
改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()
package com.itjing.config;
import com.itjing.config.executor.VisiableThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author lijing
* @date 2022年05月30日 19:37
* @description 线程池配置类
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(corePoolSize);
// 配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 配置队列大小
executor.setQueueCapacity(queueCapacity);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
再次启动服务测试
2022-05-30 20:35:15.965 INFO 14384 --- [nio-8080-exec-1] c.i.c.e.VisiableThreadPoolTaskExecutor : async-service-1653903315965, do submit Callable,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
2022-05-30 20:35:15.966 INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : start executeAsync
2022-05-30 20:35:15.966 INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : do something......
2022-05-30 20:35:19.455 INFO 14384 --- [nio-8080-exec-2] c.i.c.e.VisiableThreadPoolTaskExecutor : async-service-1653903319455, do submit Callable,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]
2022-05-30 20:35:19.456 INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : start executeAsync
2022-05-30 20:35:19.456 INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : do something......
2022-05-30 20:35:20.966 INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl : end executeAsync
2022-05-30 20:35:24.456 INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl : end executeAsync
注意这一行日志:
2022-05-30 20:35:19.455 INFO 14384 --- [nio-8080-exec-2] c.i.c.e.VisiableThreadPoolTaskExecutor : async-service-1653903319455, do submit Callable,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]
这说明提交任务到线程池的时候,调用的是submit(Callable task)
这个方法,当前已经提交了1个任务,完成了0个,当前有1个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一目了然。
这样操作的话更便于我们进行线程池监视和编程。
原文始发于微信公众号(程序员阿晶):SpringBoot中线程池的正确使用姿势
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/19608.html