java-线程池

x33g5p2x  于2021-09-24 转载在 Java  
字(8.2k)|赞(0)|评价(0)|浏览(285)

线程池

基本概念

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。比如大家所熟悉的数据库连接池正是遵循这一思想而产生的。
线程池也是一样。

应用场景

1、需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。

java中的线程池

Java中的ThreadPoolExecutor类

ThreadPoolExecutor结构:

顶层接口:

public interface Executor {
    /** * 提交一个任务 */
    void execute(Runnable command);
}

子接口:

public interface ExecutorService extends Executor {
    // 关闭线程池,尽量让现在已经执行的线程执行结束,不能在向池中提交任务
	void shutdown();
    // 停止所有线程
    List<Runnable> shutdownNow();
    boolean isShutdown();
    // 线程池是否已经完全停止了。
    boolean isTerminated();
    // 父接口中有execute可以提交一个任务到线程池中
    // 提交任务到线程池中。 利用future模式提交任务。
    // execute只管提交任务,不等待任务的执行结果。 
    //submit 提交任务之后,会有一个回调得到任务的执行结果。
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
}

接口的实现类:

public abstract class AbstractExecutorService implements ExecutorService {
	// 实现了几乎接口中的所有方法,但是定义为抽象的就是不让直接使用这个类。
}

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。
 在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;// 运行状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;// 关闭状态
    private static final int STOP       =  1 << COUNT_BITS;// 停止状态
    private static final int TIDYING    =  2 << COUNT_BITS;// 暂停状态
    private static final int TERMINATED =  3 << COUNT_BITS;// 完全结束状态
    
    // 最常用的构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性.
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

在ThreadPoolExecutor类中有几个非常重要的方法:

void execute(Runable task)
Future<T> submit(Callable<T> task)
void shutdown()
List<Runnable> shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法

案例1使用 execute提交任务
/** * @author 戴着假发的程序 */
public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        /* corePoolSize:5 maximumPoolSize:10 keepAliveTime:5 timeUnit:SECOND workQueue: LinkedBlockingQueue [threadFactory: 使用默认 Executors.defaultThreadFactory()] [RejectedExecutionHandler:AbortPolicy 抛出异常] */
        // 创建线程池
        ThreadPoolExecutor pool =
                new ThreadPoolExecutor(5,10,200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        // 使用Execute给线程池中提交任务
        for (int i = 0;i < 21;i++){
            pool.execute(new MyTask(i,pool));
            System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+
                    pool.getQueue().size()+",已执行玩别的任务数目:"+pool.getCompletedTaskCount());
        }
        pool.shutdown();
    }
}

// 准备一个任务类
class MyTask implements Runnable{
    private int id;
    ThreadPoolExecutor pool;
    public MyTask(int id,ThreadPoolExecutor pool){
        this.id = id;
        this.pool = pool;
    }
    public void run() {
        myOption();
    }
    private void myOption(){
        System.out.println("开始执行任务["+id+"]....");
        System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+
                pool.getQueue().size()+",已执行玩别的任务数目:"+pool.getCompletedTaskCount());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务["+id+"]执行结束.....");
    }
}

程序说明:

池中核心线程数是5个,最大线程数10,缓存任务的队列容量是10。

情况1:我们循环放入20个任务。

​ 启动5个线程,执行前五个任务。 将接下来的10个任务缓存到缓存队列中。

​ 我们继续加入新的任务的时候,缓存队列的任务就会放入线程池执行,所以线程池会再启动5个线程执行新放入线程池的人物,将后来的5个任务再次缓存到缓存队列。

​ 最后全部执行结束。

情况2:我们循环的放入21个任务,由于最大线程数是10,缓存队列是10,所以最多同时可以放入20个任务,当第21个任务到大是,如果没有空闲的线程,则直接拒绝处理。(抛出异常)

案例2 使用submit(Callable task) 提交任务

Future模式: 提交任务之后,不会等待任务结束。 等任务结束之后,可以使用一个回调方法获取任务执行的结果。

Callbale+Future模式和Runable模式的主要区别是:Callbale+Future可以获取任务执行的结果。

具体的做法:
在主线程中准备一个接收结果的对象(future)集合。

所有的任务结束之后,都会自动将执行的结果放入future集合。

我们在主线程中就可以获取执行的结果。(获取结果的时候是阻塞的)

Future接口:

public interface Future<V> {

    /** * 取消所有的执行结果 */
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    /** * 判断这个任务是否结束 */
    boolean isDone();

    /** * 获取返回的结果。 如果没有则阻塞。 */
    V get() throws InterruptedException, ExecutionException;

    /** * 获取返回的结果,等待指定时间,如果获取不到就返回 null; */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • 完整案例代码
/** * @author 戴着假发的程序 */
public class ThreadPoolExecutorFutureCallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 准备一个Future集合
        List<Future> futures = new ArrayList<>();
        /* corePoolSize:5 maximumPoolSize:10 keepAliveTime:5 timeUnit:SECOND workQueue: LinkedBlockingQueue [threadFactory: 使用默认 Executors.defaultThreadFactory()] [RejectedExecutionHandler:AbortPolicy 抛出异常] */
        // 创建线程池
        ThreadPoolExecutor pool =
                new ThreadPoolExecutor(5,10,200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0;i<20;i++){
            //使用submit提交任务
            Future<Integer> futureValue = pool.submit(new MyCallAble(i));
            //将结果放入集合
            futures.add(futureValue);
        }
        //从future中取出结果
        for (Future future : futures){
            System.out.println("执行结果:"+future.get());
        }
    }
}
// callable中有call方法,就类似于Runnable中的run方法。
// <Integer> 表示执行之后的结果类型。
class MyCallAble implements Callable<Integer>{
    private int id;
    public MyCallAble(int id){
        this.id = id;
    }
    public Integer call() throws Exception {
        System.out.println("执行任务:"+id);
        Thread.sleep(2000);
        return id;
    }
}

相关文章