线程池实战

x33g5p2x  于2022-04-06 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(189)

一 ThreadPool

package concurrent.threadpool;

/**
* @className: ThreadPool
* @description: 线程池接口
* @date: 2022/4/2
* @author: cakin
*/
public interface ThreadPool {
    // 提交任务到线程池
    void execute(Runnable runnable);

    // 关闭线程池
    void shutdown();

    // 获取线程池的初始化大小
    int getInitSize();

    // 获取线程池的最大线程数
    int getMaxSize();

    // 获取线程池的核心线程数量
    int getCoreSize();

    // 获取线程池中用于获取任务队列的大小
    int getQueueSize();

    // 获取线程池中活跃线程数量
    int getActiveCount();

    // 查看线程池是否已经被 shutdown
    boolean isShutdown();
}

二 RunnableQueue

package concurrent.threadpool;

/**
* @className: RunnableQueue
* @description: 任务队列,主要用于缓存提交到线程池中的任务
* @date: 2022/4/2
* @author: cakin
*/
public interface RunnableQueue {
    // 当有新任务来时首先会 offer 到队列中
    void offer(Runnable runnable);

    // 工作线程通过 take 方法获取 Runnable
    Runnable take() throws InterruptedException; //throws InterruptedException;

    // 获取任务队列中任务的数量
    int size();
}

三 ThreadFactory

package concurrent.threadpool;

/**
* @className: ThreadFactory
* @description: 线程工厂,提供创建线程的接口,以便于个性化地定制 Thread,比如 Thread 应该被放到哪个 Group 中,优先级、线程名字以及是否为守护线程等
* @date: 2022/4/2
* @author: cakin
*/
@FunctionalInterface
public interface ThreadFactory {
    // 用于创建线程
    Thread createThread(Runnable runnable);
}

四 DenyPolicy

package concurrent.threadpool;

/**
* @className: DenyPolicy
* @description: 拒绝策略:主要用于当 Queue 中的 Runnable 达到 limit 上限时,决定采用哪种策略通知提交者。该接口定义了三种默认实现
* @date: 2022/4/2
* @author: cakin
*/
@FunctionalInterface
public interface DenyPolicy {
    void reject(Runnable runnable, ThreadPool threadPool);

    // 该拒绝策略会直接将任务丢弃
    class DiscardDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            // do nothing
        }
    }

    // 该拒绝策略会向任务提交者抛出异常
    class AbortDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RuntimeDenyException("The runnable " + runnable + " will be abort.");
        }
    }

    // 该拒绝策略会使任务在提交者所在的线程中执行任务
    class RunnerDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            if (!threadPool.isShutdown()) {
                runnable.run();
            }
        }
    }
}

五 RuntimeDenyException

package concurrent.threadpool;

/**
* @className: RuntimeDenyException
* @description: 异常类:通知任务提交者,任务队列已无法再接受新的任务
* @date: 2022/4/2
* @author: cakin
*/
public class RuntimeDenyException extends RuntimeException {
    public RuntimeDenyException(String message) {
        super(message);
    }
}

六 InternalTask

package concurrent.threadpool;

public class InternalTask implements Runnable {
    private final RunnableQueue runnableQueue;

    private volatile boolean running = true;

    public InternalTask(RunnableQueue runnableQueue) {
        this.runnableQueue = runnableQueue;
    }

    @Override
    public void run() {
        // 如果当前任务为 running 并且没有被中断,则其将不断从 queue 中获取 Runnable,然后执行 run 方法
        while (running && !Thread.currentThread().isInterrupted()) {
            try {
                Runnable task = runnableQueue.take();
                task.run();
            } catch (Exception e) {
                running = false;
                break;
            }
        }
    }

    // 停止当前任务,主要会在线程池的 shutdown 方法中使用
    public void stop() {
        this.running = false;
    }
}

七 LinkedRunnableQueue

package concurrent.threadpool;

import java.util.LinkedList;

public class LinkedRunnableQueue implements RunnableQueue {
    // 任务队列的最大容量,在构造时传入
    private final int limit;
    // 若任务队列中的任务已满,则需要执行拒绝策略
    private final DenyPolicy denyPolicy;
    // 存放任务的队列
    private final LinkedList<Runnable> runnableList = new LinkedList<>();
    // 线程池
    private final ThreadPool threadPool;

    // 构造器
    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }

    @Override
    public void offer(Runnable runnable) {
        synchronized (runnableList) {
            if (runnableList.size() >= limit) {
                // 无法容纳新的任务时执行拒绝策略
                denyPolicy.reject(runnable, threadPool);
            } else {
                // 将任务加入到队尾,并且唤醒阻塞中的线程
                runnableList.addLast(runnable);
                runnableList.notifyAll();
            }
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        synchronized (runnableList) {
            while (runnableList.isEmpty()) {
                try {
                    // 如果任务队列中没有任务可执行任务,则当前线程会挂起,进入 runnableList 关联的 monitor waitset 中等待唤醒(新的任务加入)
                    runnableList.wait();
                } catch (InterruptedException e) {
                    throw e;
                }

            }
            // 从任务队列头部移除一个任务
            return runnableList.removeFirst();
        }
    }

    @Override
    public int size() {
        synchronized (runnableList) {
            // 返回当前任务队列中的任务数
            return runnableList.size();
        }
    }
}

八 BasicThreadPool

package concurrent.threadpool;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BasicThreadPool extends Thread implements ThreadPool {
    // 初始化线程数量
    private final int initSize;
    // 线程池最大线程数量
    private final int maxSize;
    // 线程池核心线程数量
    private final int coreSize;
    // 当前活跃线程数量
    private int activeCount;
    // 创建线程所需工厂
    private final ThreadFactory threadFactory;
    // 任务队列
    private final RunnableQueue runnableQueue;
    // 线程池是否已经被 shutdown
    private volatile boolean isShutdown = false;
    // 工作线程队列,存放活动线程
    private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
    // 拒绝策略
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    // 线程工厂
    private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
    private final long keepAliveTime;
    private final TimeUnit timeUnit;

    // 构造队列时需要传递的参数:初始的线程数量,最大的线程数量,核心线程数量,任务队列的最大数量
    public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
        this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);
    }

    public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, int keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init();
    }

    // 初始化时,先创建 initSize 个线程
    private void init() {
        // 启动 BasicThreadPool 自身线程
        start();
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

    private void newThread() {
        // 创建任务线程,并且启动
        InternalTask internalTask = new InternalTask(runnableQueue);
        Thread thread = this.threadFactory.createThread(internalTask);
        ThreadTask threadTask = new ThreadTask(thread, internalTask);
        threadQueue.offer(threadTask);
        this.activeCount++;
        thread.start();
    }

    private void removeThread() {
        // 从线程池中移除某个线程
        ThreadTask threadTask = threadQueue.remove();
        threadTask.internalTask.stop();
        this.activeCount--;
    }

    // 将 runnable 插入 runnableQueue 中即可
    @Override
    public void execute(Runnable runnable) {
        if (isShutdown) {
            throw new IllegalStateException("the thread pool is destroy");
        }
        // 提交任务只是简单地将任务队列中插入 runnable
        this.runnableQueue.offer(runnable);
    }

    // 为了停止 BasicThreadPool 线程,停止线程池中的活动线程并且将 isShutdown 开关变量更改为 true
    @Override
    public void shutdown() {
        synchronized (this) {
            if (isShutdown) {
                return;
            }
            isShutdown = true;
            threadQueue.forEach(threadTask -> {
                threadTask.internalTask.stop();
                threadTask.thread.interrupt();
            });
            this.interrupt();
        }
    }

    @Override
    public int getInitSize() {
        if (isShutdown) {
            throw new IllegalStateException("the thread pool is destroy");
        }
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutdown) {
            throw new IllegalStateException("the thread pool is destroy");
        }
        return this.maxSize;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown) {
            throw new IllegalStateException("the thread pool is destroy");
        }
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown) {
            throw new IllegalStateException("the thread pool is destroy");
        }
        return runnableQueue.size();
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown;
    }

    // 主要用于维护线程数量,比如扩容、回收等工作
    @Override
    public void run() {
        while (!isShutdown && !isInterrupted()) {
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                isShutdown = true;
                break;
            }

            synchronized (this) {
                if (isShutdown) {
                    break;
                }
                // 当前队列中有任务尚未处理,并且 activeCount < coreSize 则继续扩容
                if (runnableQueue.size() > 0 && activeCount < coreSize) {
                    for (int i = initSize; i < coreSize; i++) {
                        newThread();
                    }
                    // countinue 的目前在于不想让线程的扩容直接打到 maxSize
                    continue;
                }
                // 当前队列中有任务尚未处理,并且 activeCount < maxSize 则继续扩容
                if (runnableQueue.size() > 0 && activeCount < maxSize) {
                    for (int i = coreSize; i < maxSize; i++) {
                        newThread();
                    }
                }
                // 如果任务队列中没有任务,则需要回收,回收到 coreSize
                if (runnableQueue.size() == 0 && activeCount > coreSize) {
                    for (int i = coreSize; i < activeCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }

    // ThreadTask 只是 InterTask 和 Thread 的一个组合
    private static class ThreadTask {
        public ThreadTask(Thread thread, InternalTask internalTask) {
            this.thread = thread;
            this.internalTask = internalTask;
        }

        Thread thread;
        InternalTask internalTask;
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
        private static final ThreadGroup group = new ThreadGroup("MyThreadPool=" + GROUP_COUNTER.getAndDecrement());
        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());
        }
    }
}

九 ThreadPoolTest

package concurrent.threadpool;

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {
    public static void main(String[] args) {
        // 定义线程池,初始化线程为2,核心线程为4,最大线程为6,任务队列允许1000个任务
        final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);
        // 定义 20 个任务并且提交线程池
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " is running and done.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        for (; ; ) {
            // 不断输出线程池的信息
            System.out.println("getActiveCount:" + threadPool.getActiveCount());
            System.out.println("getQueueSize:" + threadPool.getQueueSize());
            System.out.println("getCoreSize:" + threadPool.getCoreSize());
            System.out.println("getMaxSize:" + threadPool.getMaxSize());
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

十 测试结果

getActiveCount:2
getQueueSize:18
getCoreSize:4
getMaxSize:6
getActiveCount:2
getQueueSize:18
getCoreSize:4
getMaxSize:6
thread-pool--1 is running and done.
thread-pool-0 is running and done.
getActiveCount:4
getQueueSize:14
getCoreSize:4
getMaxSize:6
getActiveCount:4
getQueueSize:14
getCoreSize:4
getMaxSize:6
thread-pool--3 is running and done.
thread-pool--2 is running and done.
thread-pool--1 is running and done.
thread-pool-0 is running and done.
getActiveCount:6
getQueueSize:8
getCoreSize:4
getMaxSize:6
getActiveCount:6
getQueueSize:8
getCoreSize:4
getMaxSize:6
thread-pool--3 is running and done.
thread-pool--4 is running and done.
thread-pool--2 is running and done.
thread-pool--5 is running and done.
thread-pool-0 is running and done.
thread-pool--1 is running and done.
getActiveCount:6
getQueueSize:2
getCoreSize:4
getMaxSize:6
getActiveCount:6
getQueueSize:2
getCoreSize:4
getMaxSize:6
thread-pool--4 is running and done.
thread-pool--3 is running and done.
thread-pool--2 is running and done.
thread-pool--5 is running and done.
thread-pool--1 is running and done.
thread-pool-0 is running and done.
getActiveCount:6
getQueueSize:0
getCoreSize:4
getMaxSize:6
getActiveCount:6
getQueueSize:0
getCoreSize:4
getMaxSize:6
thread-pool--4 is running and done.
thread-pool--3 is running and done.
getActiveCount:5
getQueueSize:0
getCoreSize:4
getMaxSize:6

相关文章