Executor线程池创建方式和OOM内存溢出问题(14)

x33g5p2x  于2021-08-23 转载在 Java  
字(8.7k)|赞(0)|评价(0)|浏览(711)

线程池介绍

线程池是一个线程集合,当有任务到来时线程池会为任务分配一个线程用于执行任务,如果没有任务到来线程池里面的线程就处于空闲状态;

如果不使用线程池,当每个任务来时都会为其创建一个线程:代码如下

public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            // 为每个请求都创建一个新的线程
            final Socket connection = socket.accept();
            Runnable task = () -> handleRequest(connection);
            new Thread(task).start();
        }
    }
    private static void handleRequest(Socket connection) {
        // ...
    }

这种模式在非生产模式中完全没有问题,但缺陷非常明显

  • 线程的生命周期开销非常高: 线程的创建和销毁根据不同的平台开销不同,但每个线程都有自己的生命周期,都需要时间和系统资源,如果是大数据计算消耗的系统资源更加恐怖;
  • 内存消耗大:多个线程运行,对cpu的竞争,内存的分配都非常耗资源,如果无限制的线程系统性能将大幅度降低,得不偿失;
  • 稳定性差:当系统遭到攻击时,或者抛出异常时,系统的恢复能力差;

Executor

Executor介绍

Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象,Runnable 可以表示任务,适用于生产消费者模式,当任务提交表示生产者,执行任务表示消费者;

public interface Executor {
    void execute(Runnable command);
}

Executor 有许多实现类,子类目录如下;

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执
    行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

我们使用固定大小的线程池就可以避免无限制的创建线程,减少资源浪费;如下所示创建一个50大小的线程池,当任务来时,从线程池中拿线程执行任务;

 private static final int NUMBER = 50;
    private static ExecutorService fixedThreadPool =  Executors.newFixedThreadPool(NUMBER);
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            // 为每个请求都创建一个新的线程
            final Socket connection = socket.accept();
            Runnable task = () -> handleRequest(connection);
            fixedThreadPool.execute(task);
        }
    }

    private static void handleRequest(Socket connection) {
        // ...
    }

线程池创建方式

线程池的本质目的是代替Threan, 避免无限制创建线程,减少资源开销;可以通过Executors中的静态方法来创建线程池;

  • newFixedThreadPool创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大数量。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程;
  • newCachedThreadPool创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1);
  • newSingleThreadExecutor创建一个单线程的线程池。该线程池是单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。该线程所有任务安装顺序执行;
  • newScheduledThreadPool: 创建一个固定长度的线程池,可以延时或者定时执行;

线程池生命周期

executors启动的线程为非守护线程,如果没有及时关闭executors启动的线程,那么JVM将不会自动关闭;在 executors 的扩展类 ExecutorService 中定义了 executors 的生命周期,其有三种状态,当创建任务时,处于运行态,当执行 shutdown时 处于缓慢关闭状态(不再接受提交任务,等队列中任务全部执行完再关闭线程池),当执行 shutdownNow 时 粗暴关闭(不管队列中是否存在任务,马上关闭线程池);

源码如下

public interface ExecutorService extends Executor {

    void shutdown();


    List<Runnable> shutdownNow();


    boolean isShutdown();


    boolean isTerminated();


    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    <T> Future<T> submit(Callable<T> task);


    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

使用示例如下:

    public static void main(String[] args) {

        ExecutorService executor = Executors.newSingleThreadExecutor();

        IntStream.range(0, 5).forEach(i -> executor.execute(() -> {
            // 执行任务
            String threadName = Thread.currentThread().getName();
            System.out.println("线程名称: " + threadName);
        }));
        // 关闭线程池
        executor.shutdown();
    }

在 ExecutorService中 提交执行任务有2种方式,第一种就是上面代码的excutor(Runable),执行后没有返回值,第二种就是submit(Callable),执行后返回一个Future,即执行后有返回值,表示任务执行完毕;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future future = executorService.submit(new Callable(){
            @Override
            public Object call() throws Exception {
                System.out.println("执行任务");
                return "执行任务完成";
            }
        });
        executorService.shutdown();
        System.out.println(future.get());
    }

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run());

public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("知识追寻者");
                return 2;
            }
        });
        //可以作为Runnable类型对象使用
        Thread t1 = new Thread(futureTask);
        t1.start();
        while (true){
            // 任务执行完成
            if (futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }
        }
    }

ScheduledExecutorService

cheduledExecutorServiceExecutorService 接口的扩展,支持延迟执行任务和定时执行任务, 并且是Timer 定时器的代替方案;

源码如下

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}
  • schedule 方法可以延时执行一个 Runnable 或者 Callable 任务。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔定期执行任务。

使用示例

public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
        // 延时1秒,每2秒执行一次
        executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS);
        try {
            // 保证任务都执行完毕
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            executor.shutdown();
        }
    }

关注公众知识追寻者,获取原创PDF,面试集;

ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor, 其继承 AbstractExecutorService , 构造方法如下,总共有4个

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
            				  BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
            				  BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
            				  BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
        					  BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);
   // ...
}

核心的构造方法如下

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
  • corePoolSize : 线程池大小,即线程数量
  • maximumPool:最大线程池的大小;
  • BlockingQueue:用来暂时保存任务的工作队列;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间线程会终止。线程池中的线程不会立即销毁,仅当线程池中的线程数大于corePoolSize 时,多余的空闲线程会活跃keepAliveTime时间之后才销毁;
  • unitkeepAliveTime 的时间单位;
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
  • workQueue :等待执行的任务队列
ArrayBlockingQueue // 有界阻塞队列。先进先出队列(FIFO),必须指定大小。
LinkedBlockingQueue // 无界阻塞队列。先进先出队列(FIFO)。如果没有指定队列大小,则默认为 int 最大值
SynchronousQueue // 容量为0的无界队列
PriorityBlockingQueue // 具有优先级的无界阻塞队列。
  • threadFactory:线程工厂,主要用来创建线程;

handler :任务饱和时的处理策略;

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)。 
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:直接调用 run 方法并且阻塞执行;

使用示例

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        threadPoolExecutor.execute(()->{
            System.out.println("公众号:知识追寻者, 原创PDF,系统教程知识,期待您的关注");
        });
        threadPoolExecutor.shutdown();
    }

Executors引起的OOM

在阿里巴巴规范中建议不要使用Executors的静态方法去创建线程池,原因是线程容量为 **Integer.MAX_VALUE**的无界队列会引起OOM;

设置 jvm 堆内存大小

-Xms5M -Xmx5M 

测试代码

 public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i <Integer.MAX_VALUE ; i++) {
            executorService.submit(()-> {
                // do nothing
                System.out.println("zszxz");
            } );
        }
    }

跑几十秒程序就马上出现OOM异常;

newCachedThreadPool源码如下

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newSingleThreadExecutor 源码如下

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

LinkedBlockingQueue 如果未指定大小,则默认为 Integer.MAX_VALUE,可以认为是一个无界队列,会无限制创建线程;同理 SynchronousQueue 是容量为0的无界队列,来一个任务会创建一个线程,可以创建 Integer.MAX_VALUE个线程,所以也认为 会创建无限个线程;故 推荐使用上一节的 ThreadPoolExecutor 方式创建线程池,并指定队列大小;

相关文章

微信公众号

最新文章

更多