Jdk源码分析

文章40 |   阅读 10537 |   点赞0

来源:https://yumbo.blog.csdn.net/category_10384063.html

深入解读ReentrantLock中的Condition接口的实现类ConditionObject

x33g5p2x  于2021-12-18 转载在 其他  
字(14.7k)|赞(0)|评价(0)|浏览(226)

前不久在给网友讲JUC源码,我布置了一些作业让他们做,我看到了他们返回给我的作业中在谈到Condition接口的方式对线程对象阻塞和唤醒的理解有点偏差。

我布置的作业内容是让他们回答,超类Object、Condition接口、LockSupport 三种方式对线程进行阻塞和唤醒,它们各自的优点、缺点、特点

其中讲到Condition接口的特点:
  有网友回答我说Condition的使用依赖于ReentrantLock,必须通过ReentrantLock.newCondition()方法获取到Condition接口,并且必须在
下面的代码块中使用

lock.lock();
try {

} catch (Exception e) {
    e.printStackTrace();
} finally {
    lock.unlock();
}

下面我们一起探讨一下这句话的正确性。

一道面试题如下:

要求用三个线程完成如下操作,
线程A打印5次A
线程B打印10次B
线程C打印15次
线程A打印5次A
线程B打印10次B

如此循环打印

下面是一个例子,通过Condition的方式完成3个线程的调度

案例一、使用3个Condition进行线程调度

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class DataClass {
    private int number = 1;
    private ReentrantLock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();

    public void print5() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 1) { // 多线程的判断使用while进行判断
                c1.await(); // 如果number不为1(说明不是它工作)让当前线程进行等待
            }
            // 打印5次
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 2;// 修改number值为2
            c2.signal();// 通知线程2让他开始工作
        } finally {
            lock.unlock();
        }
    }

    public void print10() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 2) { // 如果number不为2则进行等待,否则执行后面的打印10次逻辑
                c2.await();
            }
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 3; // 修改number的值为3
            c3.signal();// 通知3开始工作
        } finally {
            lock.unlock();
        }
    }

    public void print15() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 3) { // 如果number不为3则进行等待
                c3.await();
            }
            //打印15次
            for (int i = 1; i <= 15; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 1;// 修改为1
            c1.signal();// 通知1工作
        } finally {
            lock.unlock();
        }
    }

}

public class Demo001 {

    public static void main(String[] args) {

        DataClass dataClass = new DataClass();
        new Thread(() -> {
            for (int i = 0;i<10 ; i++) { // 只打印10次防止太多次了,若要一直循环则去掉i的条件判断
                try {
                    dataClass.print5();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i<10 ; i++) {
                try {
                    dataClass.print10();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0;i<10 ; i++) {
                try {
                    dataClass.print15();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();

    }
}

案例二、使用一个Condition完成线程调度

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class DataClass {
    private int number = 1;
    private ReentrantLock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();

    public void print5() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 1) { // 多线程的判断使用while进行判断
                c1.await(); // 如果number不为1(说明不是它工作)让当前线程进行等待
            }
            // 打印5次
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 2;// 修改number值为2
            c1.signalAll();// 通知线程2让他开始工作
        } finally {
            lock.unlock();
        }
    }

    public void print10() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 2) { // 如果number不为2则进行等待,否则执行后面的打印10次逻辑
                c1.await();
            }
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 3; // 修改number的值为3
            c1.signalAll();// 通知3开始工作
        } finally {
            lock.unlock();
        }
    }

    public void print15() throws InterruptedException {
    	lock.lock();
        try {
            while (number != 3) { // 如果number不为3则进行等待
                c1.await();
            }
            //打印15次
            for (int i = 1; i <= 15; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
            }
            number = 1;// 修改为1
            c1.signalAll();// 通知1工作
        } finally {
            lock.unlock();
        }
    }

}

public class Demo001 {

    public static void main(String[] args) {

        DataClass dataClass = new DataClass();
        new Thread(() -> {
            for (int i = 0;i<10 ; i++) { // 只打印10次防止太多次了,若要一直循环则去掉i的条件判断
                try {
                    dataClass.print5();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i<10 ; i++) {
                try {
                    dataClass.print10();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0;i<10 ; i++) {
                try {
                    dataClass.print15();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();

    }
}

在这两个代码中都完成了这道面试题,
第一种方式使用的是不同的Condition对象的signal方法
第二种方式使用的是相同的Condition对象的signalAll方法
这两个案例代码都使用到了一个int型的变量number,通过number的值来判断是谁该执行,谁该阻塞。

实际这道题还可以用volatile声明这个number,然后去掉锁signal、signalAll、await,通过内存可见性+自旋也能完成这道题,但是这种方式不建议,因为一直自旋非常消耗cpu资源,相当于让cpu一直在做无用功。

class DataClass {
    private volatile int number = 1;

    public void print5() throws InterruptedException {

        while (number != 1) { // 多线程的判断使用while进行判断
        }
        // 打印5次
        for (int i = 1; i <= 5; i++) {
            System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
        }
        number = 2;// 修改number值为2

    }

    public void print10() throws InterruptedException {
        while (number != 2) { // 如果number不为2则进行等待,否则执行后面的打印10次逻辑
        }
        for (int i = 1; i <= 10; i++) {
            System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
        }
        number = 3; // 修改number的值为3

    }

    public void print15() throws InterruptedException {
        while (number != 3) { // 如果number不为3则进行等待
        }
        //打印15次
        for (int i = 1; i <= 15; i++) {
            System.out.println(Thread.currentThread().getName() + "\t" + String.valueOf(i));
        }
        number = 1;// 修改为1
    }

}

读到这里我们思考一个问题,在这个案例中 Condition 和 ReentrantLock 以及 AQS 它们之间的作用。

以及在这个例子中Condition是如何让线程进行阻塞的?

我们追踪一下Condition的源码:

ReentrantLock
public class ReentrantLock implements Lock, Serializable {
    /** * 得到AQS的内部类ConditionObject的实例 */
    public Condition newCondition() {
        return sync.newCondition(); // 这个方法是Sync类定义的final方法
    }
}
AbstractQueuedSynchronizer

通过阅读源码会清楚的发现Condition接口的方式,底层也采用自旋 + LockSupport的方式对线程进行阻塞和唤醒。

并且阅读await源码会发现,调用这个方法的过程中会有释放锁的一个步骤
也就是int savedState = enableWait(node);这行代码,会将释放所有锁并且将state值返回给savedState

后面完成中断操作后,重新加锁:acquire(node, savedState, false, false, false, 0L);
这个过程经常会被提到,与 synchronized + notify类似,经常会拿来作为面试题。

总结:

完整的看完Condition相关代码后,会发现网友说的那句话是正确的,Condition的使用依赖于AQS,也的确需要在lock代码块中使用。
但有一点需要注意,Condition只是一个接口,报IllegalMonitorStateException异常并非和Condition接口有关,而是在Condition实现类 ===> ConditionObject中的代码中,判断当前线程释放是持有锁的那个线程,如果不是则会报这个异常(不是持锁线程,在排他锁的情况下当前线程应该是被阻塞的,执行了这行判断代码说明当前线程是正常工作的因此需要抛异常)。

部分注释内容我没有完全补全,因为重复或者过分简单就没有加上注释

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
		final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }
    /** * Condition链表结构的数据结构 */
    static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter;            // 链接到下一个ConditionNode

        /** * 是否可以中断 */
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        // 阻塞线程,直到判断中返回true结束循环并最终返回true。如果是直接调用该方法那么线程会阻塞
        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }
	/** * 暴露给外部使用,例如ReentrantLock */
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;

        // 形成的是一个ConditionNode的单向链表,下面两个分别记录头和尾节点
        private transient ConditionNode firstWaiter;//Condition队列的第一个节点
        private transient ConditionNode lastWaiter; //Condition队列的最后一个节点

        /** * 构造方法 */
        public ConditionObject() {
        }

        /** * 真正执行唤醒线程的方法 * 如果传入的all=true则相当于遍历链表,将所有节点都唤醒 */
        private void doSignal(ConditionNode first, boolean all) {
            while (first != null) { // 如果有队列头
                ConditionNode next = first.nextWaiter; // 找到下一个节点
                if ((firstWaiter = next) == null) // 如果下一个节点为空,说明到了链表的末尾节点
                    lastWaiter = null; //
                // 如果first移到到了最后一个节点,也就是上面逻辑first.nextWaiter=null的节点
                if ((first.getAndUnsetStatus(COND) & COND) != 0) {
                    enqueue(first);// 入队,将节点
                    if (!all) // 是否唤醒所有节点,如果传入的是true则继续遍历链表
                        break;
                }
                first = next; // 移动到下一个节点重复上面逻辑
            }
        }

        /** * 唤醒一个线程(头节点) */
        public final void signal() {
            ConditionNode first = firstWaiter;// 链表头
            if (!isHeldExclusively()) // 如果当前线程不是持有锁的对象,也就是!false会抛异常
                throw new IllegalMonitorStateException();// 抛非法监视器状态异常
            if (first != null)
                doSignal(first, false);// 调用上面的方法进行唤醒,传入false则只随机唤醒一个
        }

        /** * 唤醒所有被Condition阻塞的线程 */
        public final void signalAll() {
            ConditionNode first = firstWaiter;// 链表头
            if (!isHeldExclusively())// 当前线程对象不是持有锁线程则配移除
                throw new IllegalMonitorStateException();// 抛异常
            if (first != null)
                doSignal(first, true);// 进行唤醒
        }

        /** * 被await方法调用,用于判断是否可以进行阻塞 * node是否可以等待(阻塞),不可以则抛异常 */
        private int enableWait(ConditionNode node) {
            if (isHeldExclusively()) {                  // 如果当前线程是持有锁的线程对象
                node.waiter = Thread.currentThread();   // 获取当前线程对象
                node.setStatusRelaxed(COND | WAITING);  // 得到的是3,设置状态值
                ConditionNode last = lastWaiter;        // 得到链表末尾节点
                if (last == null)                       // 如果链表末尾节点为null,说明是空链表
                    firstWaiter = node;                 // 直接将当前node插入链表的头形成新的头节点,同时也作为链表的末尾(即是头也是尾)
                else
                    last.nextWaiter = node;             // 如果不为null直接将node加入到末尾
                lastWaiter = node;                      // 记录末尾节点是当前node,相当于尾插法队列末尾
                int savedState = getState();// 获取state值
                if (release(savedState))    // 释放锁
                    return savedState;      // 返回释放锁后的state(相当于返回上锁次数)
            }
            node.status = CANCELLED; // 如果当前线程没有抢占到锁,更新节点的状态尾取消状态
            throw new IllegalMonitorStateException(); // 因为当前线程不是持锁线程抛出非法监视器异常
        }

        /** * 能否可以重新获取到锁 */
        private boolean canReacquire(ConditionNode node) {
            return node != null && node.prev != null && isEnqueued(node); // 判断当前node是否已经入队
        }

        /** * 从等待队列中去除当前节点 */
        private void unlinkCancelledWaiters(ConditionNode node) {
            if (node == null || node.nextWaiter != null || node == lastWaiter) {
                ConditionNode w = firstWaiter, trail = null;
                while (w != null) {
                    ConditionNode next = w.nextWaiter;// 得到下一个节点
                    if ((w.status & COND) == 0) {// 找到了这个
                        w.nextWaiter = null; // 置null
                        if (trail == null)
                            firstWaiter = next; // 重新保存队列头节点,因为异常的节点可能就是队列头节点,所以重新得到头节点
                        else
                            trail.nextWaiter = next;// 通过trail删除node节点
                        if (next == null)
                            lastWaiter = trail; // next为null说明上一个节点就是队列尾节点(也就是trail)
                    } else
                        trail = w;// 记录一些
                    w = next;// 移到到下一个节点
                }
            }
        }

        /** * */
        public final void awaitUninterruptibly() {
            ConditionNode node = new ConditionNode();// 创建一个节点
            int savedState = enableWait(node); // 释放锁,返回锁计数
            LockSupport.setCurrentBlocker(this); // 将当前对象设置尾阻塞资源
            boolean interrupted = false; // 不可以中断
            while (!canReacquire(node)) { // 如果不能被重新得到node则进行中断操作
                if (Thread.interrupted()) // 如果已经被中断了则更新interrupted = true;
                    interrupted = true;
                else if ((node.status & COND) != 0) {
                    try {
                        ForkJoinPool.managedBlock(node); // 阻塞当前资源
                    } catch (InterruptedException ie) {
                        interrupted = true; //
                    }
                } else
                    Thread.onSpinWait();    // 入队的时候就会被唤醒
            }
            LockSupport.setCurrentBlocker(null); // 清除阻塞资源
            node.clearStatus();                  // 清除状态值
            // 重新加锁
            acquire(node, savedState, false, false, false, 0L);
            if (interrupted)
                Thread.currentThread().interrupt(); // 中断当前线程
        }

        /** * 进行阻塞 */
        public final void await() throws InterruptedException {
            // 如果线程被中断了抛中断异常
            if (Thread.interrupted()) throw new InterruptedException();

            ConditionNode node = new ConditionNode();   // 创建一个节点
            int savedState = enableWait(node);          // 如果当前线程是持有锁线程,释放所有锁,并且把锁计数返回
            LockSupport.setCurrentBlocker(this);        // 将当前对象作为阻塞资源
            boolean interrupted = false, cancelled = false; // 两个布尔变量分别表示中断和取消
            while (!canReacquire(node)) {               // 自选的方式进行中断当前线程
                if (interrupted |= Thread.interrupted()) { // 或运算,因为interrupted=false所以值取决于Thread.interrupted()
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;              // 线程被中断了并且state值发送了变化就结束循环
                } else if ((node.status & COND) != 0) {
                    try {
                        ForkJoinPool.managedBlock(node);// 阻塞node资源
                    } catch (InterruptedException ie) {
                        interrupted = true; // 中断设置为true
                    }
                } else
                    Thread.onSpinWait();    // 进行自旋等待
            }
            LockSupport.setCurrentBlocker(null);// 清除阻塞器
            node.clearStatus();// 清除线程状态值
            acquire(node, savedState, false, false, false, 0L);// 重新进行加锁,将之前释放的锁重新进行还原回去
            if (interrupted) {
                if (cancelled) {
                    unlinkCancelledWaiters(node);// 如果当前线程需要进行中断,则从等待队列中去除当前节点
                    throw new InterruptedException();
                }
                Thread.currentThread().interrupt();// 中断当前线程
            }
        }

        /** * */
        public final long awaitNanos(long nanosTimeout) throws InterruptedException {
            if (Thread.interrupted()) throw new InterruptedException();
            ConditionNode node = new ConditionNode(); // 创建一个节点
            int savedState = enableWait(node); // 释放锁,返回上锁的次数
            long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; // 重新计算时间,<0就赋值为0,否则还是传入的值
            long deadline = System.nanoTime() + nanos; // 计算死亡的时间,纳秒(系统的时间+nanos纳秒)
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) { // 如果不能被中断就进行中断,自选的方式进行中断
                if ((interrupted |= Thread.interrupted()) || (nanos = deadline - System.nanoTime()) <= 0L) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;// 中断完成跳出自旋
                } else
                    LockSupport.parkNanos(this, nanos);
            }
            node.clearStatus(); // 清除状态
            // 重新加锁
            acquire(node, savedState, false, false, false, 0L);
            // 如果线程已经被中断成功了
            if (cancelled) {
                unlinkCancelledWaiters(node);// 从等待队列中去除当前node节点
                if (interrupted)
                    throw new InterruptedException();
            } else if (interrupted)
                Thread.currentThread().interrupt();// 中断当前线程
            long remaining = deadline - System.nanoTime(); // 计算还有多少纳秒的等待
            return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;// 返回剩余时间
        }

        /** * */
        public final boolean awaitUntil(Date deadline) throws InterruptedException {
            long abstime = deadline.getTime(); // 获取阻塞的时间
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();   // 创建一个节点
            int savedState = enableWait(node);          // 释放锁并返回锁计数
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) { // 自旋的方式中断当前线程
                if ((interrupted |= Thread.interrupted()) || System.currentTimeMillis() >= abstime) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break; // 中断成功
                } else
                    LockSupport.parkUntil(this, abstime); // 阻塞当前线程,超过abstime就自动唤醒线程
            }
            node.clearStatus();// 清除状态值,将state置0,方便下面重新加savedState次锁
            acquire(node, savedState, false, false, false, 0L);

            if (cancelled) {
                unlinkCancelledWaiters(node); // 从等待队列中移除当前节点
                if (interrupted)
                    throw new InterruptedException();
            } else if (interrupted)
                Thread.currentThread().interrupt();// 中断当前线程
            return !cancelled;
        }

        /** * 带超时时间的阻塞,java8新的时间api方式 */
        public final boolean await(long time, TimeUnit unit) throws InterruptedException {
            long nanosTimeout = unit.toNanos(time); // 获取阻塞的最长时间
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();// 创建一个节点
            int savedState = enableWait(node);          // 释放锁并且返回锁计数
            long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;// 防止负数
            long deadline = System.nanoTime() + nanos;// 计算阻塞最大的时间点(在这个点以前都阻塞,过了就会自动唤醒)
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) { // 自选的方式中断当前线程
                if ((interrupted |= Thread.interrupted()) ||
                        (nanos = deadline - System.nanoTime()) <= 0L) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;
                } else
                    LockSupport.parkNanos(this, nanos);// 带超市的方式阻塞
            }
            node.clearStatus();// 将state置0
            // 重新加锁savedState次
            acquire(node, savedState, false, false, false, 0L);
            if (cancelled) {
                unlinkCancelledWaiters(node);
                if (interrupted)
                    throw new InterruptedException();
            } else if (interrupted)
                Thread.currentThread().interrupt();
            return !cancelled;
        }

        /** * 是否是AQS */
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        /** * 是否有等待线程 */
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 遍历单向链表进行查找
            for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
                if ((w.status & COND) != 0)
                    return true;
            }
            return false;
        }

        /** * */
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
                if ((w.status & COND) != 0) // status是常量3(表示等待状态) 而COND则是2进行&运算则是2,因此2!=0返回true
                    ++n; // 如果符合线程等待状态将计数+1
            }
            return n;//返回aqs队列中状态为waiting的线程个数
        }

        /** * 返回等待线程的集合 */
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())// 判断是否有线程独占,独占则说明有等待线程,否则说明没有等待线程返回false也就会抛出IllegalMonitorStateException
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<>(); //创建一个ArrayList集合类
            for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { //for遍历AQS队列的节点
                if ((w.status & COND) != 0) { //只将线程状态为WAITING状态的线程存入ArrayList中
                    Thread t = w.waiter;//取出线程
                    if (t != null) // 线程!=null
                        list.add(t);// 添加进集合
                }
            }
            return list; // 返回集合
        }
    }

}

相关文章