Java线程池,不会不行

x33g5p2x  于10个月前 转载在 Java  
字(10.8k)|赞(0)|评价(0)|浏览(74)

一、线程池介绍

线程池也是一种池化思想的实现,常见的有 线程池、字符串常量池、数据库连接池 等。

线程池就是一个存放线程的容器,使得每次需要使用线程的时候,直接获取到线程对象,执行任务即可。

先看一下没有线程池时使用线程

  1. 手动创建线程对象。
  2. 执行任务。
  3. 执行完毕,释放线程对象。

每一次使用到线程,都需要执行一遍以上的步骤,这使得效率很低下。

这就需要考虑到线程池的使用了,线程池的优点如下(池化容器优点):

  • 降低资源消耗。重复利用已创建的线程,降低线程创建和销毁的造成的消耗,还提升了线程利用率。
  • 提高程序响应速度。有请求时,直接获取到线程对象执行即可,无需执行线程创建操作。
  • 统一管理线程对象。线程池可统一优化和监控线程,还可以控制最大并发数。

二、线程池核心

1.内部原理

先看一个简单的例子:

有客户来到银行办理业务,假设有五个柜台,三个等待位置。当来了一个客户,就开启一个柜台办理业务。当某个柜台一定的时间没有用户来办理业务,就关闭此柜台。

当来了8个客户,5个客户在办理,3个客户在等待区等待,而这时又来了一个客户,银行就告知此客户这里没有位置办理他的业务。

以上例子正是线程池的思想

  1. 接收第一个任务的时候,会创建核心线程数,核心线程直到线程池关闭才会随着关闭。
  2. 一开始使用核心线程执行任务,其他任务进入等待队列等待执行,当有空闲的线程对象的时候,就调用空闲的线程对象执行任务。
  3. 当等待队列满了以后,可以进行线程对象创建,而线程池容量有限,最大线程数就是线程池容量,当线程池满了以后,就不再创建新的线程对象。
  4. 当线程池已满,且没有空闲线程时,此时还有请求,就会抛异常或者拒绝请求(具体策略可以设置)。
  5. 一个线程对象,在一定时间内,没有执行任何任务,就会销毁此空闲线程对象。
  6. 如果队列满了,再加任务的话,会优先执行新的任务,而不是从队列中取。

2.线程池组成

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池、销毁线程池和添加新任务
  • 工作线程(PoolWorker):线程池中的线程对象,在没有任务时处于等待状态,可以循环的执行任务。
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等。
  • 任务队列(taskQueue):用于存放等待处理的任务。它提供一种缓冲机制。

3.核心参数

线程池位于java.util.concurrent 包下。Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

下面就介绍一下ThreadPoolExecutor类线程池,这一个线程池更加的稳定简明。下面是它的一个构造方法(总共有四个,这个最全)。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 下面是参数判断,可以不看
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中:

  • corePoolSize:核心线程数量,核心线程会一直存在,除非allowCoreThreadTimeOut设置为true。
  • maximumPoolSize:线程池最大容量。
  • keepAliveTime:线程数量超过corePoolSize后,空闲线程的最大超时时间。
  • unit:超时时间的单位。
  • workQueue:工作队列,保存未执行的Runnable 任务。
  • threadFactory:创建线程的工厂类。
  • handler:拒绝策略,当线程和工作队列已满的时候调用。

下面进行简单的使用演示。代码测试(使用JUC工具包)

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor
                = new ThreadPoolExecutor(
                            2,// 线程初始数
                            4,// 线程池最大容量
                            10,// 线程空闲存活时间
                            TimeUnit.SECONDS,
                            new ArrayBlockingQueue<>(1),// 等待队列
                            Executors.defaultThreadFactory(), // 生成线程的策略
                            new ThreadPoolExecutor.AbortPolicy()// 异常处理
                    );

        // 在短时间内,for循环所消耗时间不记,默认为同时发起指定的请求
        for (int i = 0; i < 5; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "===> 办理业务");
            });
        }
    }
}

以上代码,核心线程数为2,最大容量是4,等待队列可容纳1,则此线程池一次性可并发执行5个任务,如果有六个任务需要执行,就会走异常。

输出:
pool-1-thread-1===> 办理业务
pool-1-thread-3===> 办理业务
pool-1-thread-1===> 办理业务
pool-1-thread-2===> 办理业务
pool-1-thread-4===> 办理业务

4.核心方法

不看也影响线程池的使用,内部引用二进制运算符,提高了运行效率。

ThreadPoolExecutor类的execute方法(执行任务入口):

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取clt,clt记录着线程池状态和运行线程数。
    int c = ctl.get();
    //运行线程数小于核心线程数时,创建线程放入线程池中,并且运行当前任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        //创建线程失败,重新获取clt。
        c = ctl.get();
    }
    //线程池是运行状态并且运行线程大于核心线程数时,把任务放入队列中。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //重新检查线程池不是运行状态时,
        //把任务移除队列,并通过拒绝策略对该任务进行处理。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //当前运行线程数为0时,创建线程加入线程池中。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //运行线程大于核心线程数时并且队列已满时,
    //创建线程放入线程池中,并且运行当前任务。
    else if (!addWorker(command, false))
        reject(command);
}

在execute方法中,多次调用的addWorker方法,再看一下这个方法:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            //线程池处于关闭状态,或者当前任务为null
        //或者队列不为空,则直接返回失败。
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                //线程数超过线程池最大容量,则返回false;
            //这里的core是addWorker方法的第二个参数,
            //如果为true则根据核心线程数进行比较,
            //如果为false则根据最大线程数进行比较。
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                //尝试增加线程数,如果成功,则跳出第一个for循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                 //如果增加线程数失败,则重新获取ctl
                c = ctl.get();  // Re-read ctl
                //如果当前的运行状态不等于SHUTDOWN,说明状态已被改变,
            //返回第一个for循环继续执行
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
}

三、在springBoot中使用线程池

在SpringBoot项目中,可以用Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用。它的内部也是通过ThreadPoolExecutor实现。

本地环境:

IDEA、JDK1.8+、Maven、Win10,还有网络。

1.新建立SpringBoot项目

话不多说,【点击我查看如何快速搭建一个SpringBoot项目】

pom.xml导入swagger2、lombok依赖【Maven点击我查询

<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger2</artifactId>
    <version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger-ui</artifactId>
    <version>2.9.2</version>
</dependency>

<!-- lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

在application.yml文件中配置参数:

server:
  port: 8082
  
# thread pool
async:
  executor:
    thread:
      core_pool_size: 3
      max_pool_size: 5
      queue_capacity: 3
      name:
        prefix: async-service-

2.线程池配置类

让SpringBoot加载此配置类

package com.pdh.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Data
@Slf4j
@Configuration
@EnableAsync // 开启多线程
public class ThreadPoolConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean("asyncTaskExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // 拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

3.新建Service类

编写两个Service,一个是ThreadService,作为异步线程执行服务。另一个是OtherService,是用户具体的服务类

ThreadService

package com.pdh.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ThreadService {

    @Async("asyncTaskExecutor") // 在Spring容器中使用 asyncTaskExecutor对象执行以下代码块
    public void executeAsync(){
        log.info("start executeAsync");

        System.out.println("异步线程要做的事情");
        try {
            // 睡眠10s
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("可以在这里执行批量插入等耗时的事情");

        log.info("end executeAsync");
    }
}

OtherService

package com.pdh.service;

import org.springframework.stereotype.Service;

@Service
public class OtherService {
    public String test(){
        return "我是OtherService的test方法";
    }
}

4.编写TestController

TestController响应前端请求

package com.pdh.controller;

import com.pdh.service.OtherService;
import com.pdh.service.ThreadService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/** * @Author: 彭_德华 * @Date: 2021-10-19 16:00 */
@RestController
public class TestController {
    @Autowired
    private ThreadService threadService;

    @Autowired
    private OtherService otherService;

    @GetMapping("/async")
    public String async(){
        String str = otherService.test();
        // 如果是异步执行,就会不会等待10s(因为executeAsync方法设置了sleep 10s)
        threadService.executeAsync();
        return str;
    }
}

5.使用Swagger2测试

使用Swagger2进行接口测试,【点击我跳转 快速使用Swagger2】,编写Swagger2配置类

@Configuration
@EnableSwagger2
public class Swagger2Config {

    @Bean
    public Docket docket(Environment environment){
        // 设置要显示的swagger环境
        Profiles profiles = Profiles.of("dev","test");
        // 判断是否处在自己设定的环境中
        boolean flag = environment.acceptsProfiles(profiles);

        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("ThreadPool")
                .apiInfo(apiInfo2())
                .enable(true)
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.pdh.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo(){
        return new ApiInfoBuilder()
                .title("线程池异步执行测试")
                .description("测试后端移步进行相应的操作")
                .termsOfServiceUrl("https://blog.csdn.net/yeahPeng11")
                .contact(new Contact("彭德华","https://blog.csdn.net/yeahPeng11","pdhcool@163.com"))
                .version("v1.0")
                .build();
    }
}

启动项目后,访问 http://localhost:8082/swagger-ui.html。端口改成application.yml中配置的端口,默认为 8080 端口(我是8082端口)。

swagger2中测试 /async 请求

  • 前端会立马得到结果。
  • 后端在控制台会有输出(前后相差10s,因为sleep 10s)
  • 可以多测试几组,查看后端控制台和前端页面显示,就会发现会有不同的线程执行
2021-10-19 16:58:34.768  INFO 77324 --- [async-service-1] com.pdh.service.ThreadService            : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2021-10-19 16:58:44.775  INFO 77324 --- [async-service-1] com.pdh.service.ThreadService            : end executeAsync
2021-10-19 17:15:14.768  INFO 77324 --- [async-service-2] com.pdh.service.ThreadService            : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2021-10-19 17:15:24.775  INFO 77324 --- [async-service-2] com.pdh.service.ThreadService            : end executeAsync

至此,线程池的使用都已经涉及到了。

四、问题排查

对于以上所有的功能确实都能实现,那么出现问题了我需要怎么排查?下面提供两种解决思路:

  1. 查看线程池使用情况:创建ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来。
  2. 线程池自定义异常处理方法:在定义 ThreadFactory 的时候调用setUncaughtExceptionHandler方法,自定义异常处理方法。例如:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("judge-pool-%d")
                .setUncaughtExceptionHandler((thread, throwable)-> logger.error("ThreadPool {} got exception", thread,throwable))
                .build();

相关文章