SpringBoot中线程池的正确使用姿势

大家好,我是阿晶,今天我们来介绍一下在 Java 项目中线程池的使用,线程池的使用也有正确的姿势,有了正确的姿势后,可以让你的项目更加的持久稳定。

Spring中提供了对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor

使用步骤

配置类

先创建线程池的配置,让SpringBoot将其加载进来。

使用@Configuration@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类

package com.itjing.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author lijing
 * @date 2022年05月30日 19:37
 * @description 线程池配置类
 */

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 配置队列大小
        executor.setQueueCapacity(queueCapacity);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }
}

配置文件

# 异步线程配置
async:
  executor:
    thread:
      # 配置核心线程数
      core_pool_size: 10
      # 配置最大线程数
      max_pool_size: 10
      # 配置队列大小
      queue_capacity: 1000
      # 配置线程池中的线程的名称前缀
      name:
        prefix: async-service-

业务类

创建一个Service接口,是异步线程的接口

package com.itjing.service;

/**
 * @author lijing
 * @date 2022年05月30日 19:50
 * @description
 */

public interface AsyncService {
    /**
     * 执行异步任务
     * 可以根据需求,自己加参数拟定,我这里就做个测试演示
     */

    void executeAsync();
}

实现类

package com.itjing.service.impl;

import com.itjing.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @author lijing
 * @date 2022年05月30日 19:53
 * @description
 */

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        log.info("start executeAsync");
        // 异步线程要做的事情 TODO
        // 可以在这里执行操作比较耗时的事情
        log.info("do something......");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("end executeAsync");
    }
}

将Service层的方法异步化,在executeAsync()方法上增加注解@Async("asyncServiceExecutor")asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的。

控制器

接下来就是在Controller里或者是哪里通过注解@Autowired注入这个Service

package com.itjing.controller;

import com.itjing.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author lijing
 * @date 2022年05月30日 19:59
 * @description
 */

@RestController
@RequestMapping("/async")
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/async")
    public void async() {
        asyncService.executeAsync();
    }
}

测试

用ApiPost或者其他工具来多次测试请求一下

2022-05-30 20:02:52.656  INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : start executeAsync
2022-05-30 20:02:52.656  INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : do something......
2022-05-30 20:02:52.656  INFO 20804 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : end executeAsync
2022-05-30 20:03:46.919  INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : start executeAsync
2022-05-30 20:03:46.919  INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : do something......
2022-05-30 20:03:46.919  INFO 20804 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : end executeAsync
2022-05-30 20:03:47.412  INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl   : start executeAsync
2022-05-30 20:03:47.412  INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl   : do something......
2022-05-30 20:03:47.412  INFO 20804 --- [async-service-3] c.itjing.service.impl.AsyncServiceImpl   : end executeAsync

通过以上的日志可以发现,[async-service-]有多个线程,显然已在配置的线程池中执行了。

虽然已经用上了线程池,但是我们并不清楚线程池当时情况,比如有多少线程在执行,有多少线程在队列等待呢?

重写线程池

这里创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来。

package com.itjing.config.executor;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author lijing
 * @date 2022年05月30日 20:10
 * @description 创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来
 */

@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {

        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (Objects.isNull(threadPoolExecutor)) {
            return;
        }

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                getThreadNamePrefix() + System.currentTimeMillis(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size()
        );
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("do execute Runnable");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("do execute Runnable and StartTimeout");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("do submit Runnable");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("do submit Callable");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("do submitListenable Runnable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("do submitListenable Callable");
        return super.submitListenable(task);
    }
}

如上所示,showThreadPoolInfo方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的executesubmit等方法,在里面调用showThreadPoolInfo方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;

修改 ExecutorConfig.javaasyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()

package com.itjing.config;

import com.itjing.config.executor.VisiableThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author lijing
 * @date 2022年05月30日 19:37
 * @description 线程池配置类
 */

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 配置队列大小
        executor.setQueueCapacity(queueCapacity);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }
}

再次启动服务测试

2022-05-30 20:35:15.965  INFO 14384 --- [nio-8080-exec-1] c.i.c.e.VisiableThreadPoolTaskExecutor   : async-service-1653903315965do submit Callable,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
2022-05-30 20:35:15.966  INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : start executeAsync
2022-05-30 20:35:15.966  INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : do something......
2022-05-30 20:35:19.455  INFO 14384 --- [nio-8080-exec-2] c.i.c.e.VisiableThreadPoolTaskExecutor   : async-service-1653903319455do submit Callable,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]
2022-05-30 20:35:19.456  INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : start executeAsync
2022-05-30 20:35:19.456  INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : do something......
2022-05-30 20:35:20.966  INFO 14384 --- [async-service-1] c.itjing.service.impl.AsyncServiceImpl   : end executeAsync
2022-05-30 20:35:24.456  INFO 14384 --- [async-service-2] c.itjing.service.impl.AsyncServiceImpl   : end executeAsync

注意这一行日志:

2022-05-30 20:35:19.455  INFO 14384 --- [nio-8080-exec-2] c.i.c.e.VisiableThreadPoolTaskExecutor   : async-service-1653903319455do submit Callable,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]

这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了1个任务,完成了0个,当前有1个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一目了然。

这样操作的话更便于我们进行线程池监视和编程。


原文始发于微信公众号(程序员阿晶):SpringBoot中线程池的正确使用姿势

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/19608.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!