Java多线程-线程池

x33g5p2x  于9个月前 转载在 Java  
字(8.3k)|赞(0)|评价(0)|浏览(107)

线程池的作用

线程池作用就是限制系统中执行线程的数量。
根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排 队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。

我们先来了解一下Executors 和 ExecutorService

下表列出了 Executor 和 ExecutorService 的区别:

ExecutorExecutorService
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务ExecutorService 是 Executor 接口的扩展,提供了异步执行和关闭线程池的方法
提供execute()方法用来提交任务提供submit()方法用来提交任务
execute()方法无返回值submit()方法返回Future对象,可用来获取任务执行结果
不能取消任务可以通过Future.cancel()取消pending中的任务
没有提供和关闭线程池有关的方法提供了关闭线程池的方法

简单来说ExecutorService 继承了 Executors 多了些东西 那么我们在下面创建线程池时候使用的对象记得是 ExecutorService

ExecutorService对象内常用的方法

void execute(Runnable task); 提交线程到线程池无返回值 (最常用)

executorService.submit() 提交线程到线程池有返回值

注意提交线程到线程池后会自动运行线程了

void shutdown(); 关闭线程池 在关闭前创建的线程会继续执行 而关闭后创建的线程将不会执行将会报错 (一般不建议使用)

还有其他的方法 比如 shutdownNow(); 关闭线程池不管是否有线程还未执行完成

注意的是shutdownNow这个方法底层会将所有线程的中断标识符设置为中断状态(true) 可以自己看源码

…自己看ExecutorService 类里的源码

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

通过Executors提供四种线程池,分别为:

FiexedThreadPool

public static ExecutorService newFiexedThreadPool(int Threads)

创建一个固定长度的线程池,超出的线程会在队列中等待

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(index);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

CachedThreadPool

public static ExecutorService newCachedThreadPool():

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            //添加一个线程
            int finalI = i;
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                }
            });
        }

SingleThreadExecutor

//Executors.newSingleThreadExecutor() //

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

简单来说就是保证所有使用SingleThreadExecutor创建的线程的执行顺序

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(index);

                }
            });
        }

结果依次输出,相当于顺序执行各个任务。

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

延迟执行示例代码如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new Runnable() {

            @Override
            public void run() {
                System.out.println("delay 3 seconds");
            }
        }, 3, TimeUnit.SECONDS); //延迟3秒后执行

定期执行示例代码如下:

public static void main(String[] args)  {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                System.out.println("delay 1 seconds, and excute every 3 seconds");
            }
        }, 1, 3, TimeUnit.SECONDS); //表示延迟1秒后每3秒执行一次。 无限执行下去
    }

手动创建线程池

一、为什么要手动创建线程池?

我们之所以要手动创建线程池,是因为 JDK 自带的工具类所创建的线程池存在一定的弊端

/** * 各参数含义 * corePoolSize : 线程池中常驻的线程数量。核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会 * 受存活时间 keepAliveTime 的限制,除非将 allowCoreThreadTimeOut 设置为 true。 * maximumPoolSize : 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的 * LinkedBlockingQueue时,这个值无效。 * keepAliveTime : 当线程数量多于 corePoolSize 时,空闲线程的存活时长,超过这个时间就会被回收 * unit : keepAliveTime 的时间单位 * workQueue : 存放待处理任务的队列 * threadFactory : 线程工厂 * handler : 拒绝策略,拒绝无法接收添加的任务 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) { ... ... }

使用 JDK 自带的 Executors工具类 类似于 Collections 和 Arrays) 可以直接创建以上面几种线程池:

JDK 自带工具类创建的线程池存在的问题

有的线程池可以无限添加任务或线程,容易导致 OOM;就拿我们最常用FixedThreadPool和 CachedThreadPool来说,

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

可见其任务队列用的是LinkedBlockingQueue,且没有指定容量,相当于无界队列,这种情况下就可以添加大量的任务,甚至达到Integer.MAX_VALUE的数量级,如果任务大量堆积,可能会导致 OOM。

还有一个问题就是这些线程池的线程都是使用 JDK 自带的线程工厂 (ThreadFactory)创建的,线程名称都是类似pool-1-thread-1的形式,第一个数字是线程池编号,第二个数字是线程编号,这样很不利于系统异常时排查问题。

如果你安装了“阿里编码规约”的插件,在使用Executors创建线程池时会出现以下爆红警告信息:
为避免这些问题,我们最好还是手动创建线程池。

二、 如何手动创建线程池

首先要说明一点,定制线程池的线程数并不是多么高深的学问,也不是说一旦线程数设定不合理,你的程序就无法运行,而是要尽量避免以下两种极端条件:

  1. 线程数量过大这会导致过多的线程竞争稀缺的 CPU 和内存资源。CPU 核心的数量和计算能力是有限的,在分配不到 CPU 执行时间的情况下,线程只能处于空闲状态。而在JVM 中,线程本身也是对象,也会占用内存,过多的空闲线程自然会浪费宝贵的内存空间。
  2. 线程数量过小线程池存在的意义,或者说并发编程的意义就是为了“压榨”计算机的运算能力,说白了就是别让 CPU 闲着。如果线程数量比 CPU 核心数量还小的话,那么必定有 CPU 核心将处于空闲状态,这是极大的浪费。

所以在实际开发中我们需要根据实际的业务场景合理设定线程池的线程数量,那又如何分析业务场景呢?我们的业务场景大致可以分为以下两大类:

  1. CPU (计算)密集型这种场景需要大量的 CPU 计算,比如加密、计算 hash 等,最佳线程数为 (CPU 核心数 + 1)。比如8核 CPU,可以把线程数设置为 9,这样就足够了,因为在 CPU 密集型的场景中,每个线程都会在比较大的负荷下工作,很少出现空闲的情况,正好每个线程对应一个 CPU 核心,然后不停地工作,这样就实现了最优利用率。多出的一个线程起到了备胎的作用,在其他线程意外中断时顶替上去,确保 CPU 不中断工作。其实也大可不必这么死板,线程数量设置为 CPU 核心数的 1 到 2 倍都是可以接受的。
  2. I/O 密集型比如读写数据库,读写文件或者网络读写等场景。各种 I/O 设备 (比如磁盘)的速度是远低于 CPU 执行速度的,所以在 I/O 密集型的场景下,线程大部分时间都在等待资源而非 CPU 时间片,这样的话一个 CPU 核心就可以应付很多线程了,也就可以把线程数量设置大一点。线程具体数量的计算方法可以参考 Brain Goetz 的建议:假设有以下变量:Nthreads = 线程数量Ncpu = CPU 核心数Ucpu = 期望的CPU 的使用率 ,因为 CPU 可能还要执行其他任务W = 线程的平均等待资源时间C = 线程平均使用 CPU 的计算时间W / C = 线程等待时间与计算时间的比率

这样为了让 CPU 达到期望的使用率,最优的线程数量计算公式如下:

Nthreads = Ncpu Ucpu ( 1 + W / C )

CPU 核心数可以通过以下方法获取:

int N_CPUS = Runtime.getRuntime().availableProcessors();

当然,除了 CPU,线程数量还会受到很多其他因素的影响,比如内存和数据库连接等,需要具体问题具体分析。

使用可自定义线程名称的线程工厂

这个就简单多了,可以借助大名鼎鼎的谷歌开源工具库 Guava,首先引入如下依赖:

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>28.2-jre</version>
</dependency>

然后我就可以使用其提供的ThreadFactoryBuilder类来创建线程工厂了,Demo 如下:

public class ThreadPoolDemo {

    // 线程数
    public static final int THREAD_POOL_SIZE = 16;

    public static void main(String[] args) throws InterruptedException {
        // 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("hyn-demo-pool-%d").build();
        
        // 创建线程池,其中任务队列需要结合实际情况设置合理的容量
        ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
                THREAD_POOL_SIZE,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());
        
        // 新建 1000 个任务,每个任务是打印当前线程名称
        for (int i = 0; i < 1000; i++) {
            executor.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
        // 优雅关闭线程池
        executor.shutdown();
        executor.awaitTermination(1000L, TimeUnit.SECONDS);
        // 任务执行完毕后打印"Done"
        System.out.println("Done");
    }
}

案例:RabbitMq连接池设计

package cn.mq.rabbitmqconfig;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.*;

/** * @author HuAnmin * @version 1.0 * @email 3426154361@qq.com * @date 2021/4/15-3:41 * @description 类描述.... */
@Component
@Getter
@Setter
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConnection {

    // 线程数/连接池数量
    public static final int THREAD_POOL_SIZE = 100;
    ArrayBlockingQueue<Connection> list = new ArrayBlockingQueue<Connection>(THREAD_POOL_SIZE);
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("hyn-demo-pool-%d").build();
    // 创建线程池,其中任务队列需要结合实际情况设置合理的容量
    ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
            THREAD_POOL_SIZE,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(Math.round(THREAD_POOL_SIZE/2)),
            namedThreadFactory,
            new ThreadPoolExecutor.AbortPolicy());

    public String host;
    private int port;
    private String virtualHost;
    private String username;
    private String password;

    @PostConstruct //加上该注解表明该方法会在bean初始化后调用
    private void init() throws InterruptedException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
        connectionFactory.setHost(host);
        //连接端口;默认为 5672
        connectionFactory.setPort(port);
        //虚拟主机名称 就是和你用户绑定的虚拟机 在创建用户时候就指定了
        connectionFactory.setVirtualHost(virtualHost);
        //连接用户名
        connectionFactory.setUsername(username);
        //连接密码
        connectionFactory.setPassword(password);

        for (int i = 0; i < THREAD_POOL_SIZE - 1; i++) {
            executor.execute(() -> {
                while (true) {
                    try {
                        list.put(connectionFactory.newConnection());
// System.out.println("创建连接到连接池中:"+list.size());
                    } catch (InterruptedException | TimeoutException | IOException e) {
                        e.printStackTrace();
                    }

                }
            });
        }
    }

        public Connection getNewConnection() throws InterruptedException {
            Connection take = list.take();
            while (true) {
                if (take.isOpen()) {
                    return take;
                } else {
                    take = list.take();
                }
            }
        }

}

相关文章