Jdk源码分析

文章40 |   阅读 10529 |   点赞0

来源:https://yumbo.blog.csdn.net/category_10384063.html

以ReentrantLock的非公平锁为例深入解读AbstractQueuedSynchronizer源码

x33g5p2x  于2021-12-18 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(263)

以下面这段代码为例,我们分析以下ReentrantLock的工作原理,聊一聊,ReentrantLock到底做了哪些事情!

public class ReentrantLockTest {

    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {

        new Thread(()->{
        	lock.lock();
            try {
                System.out.println("A:这是一段加锁代码段");
            } finally {
                //lock.unlock();
            }
        },"A").start();

        new Thread(()->{
        	lock.lock();
            try {
                System.out.println("B:这是一段加锁代码段");
            } finally {
                lock.unlock();
            }
        },"B").start();
    }

}

在上面的代码你是不是疑惑为什么线程A的lock.unlock()是被注释掉的。是不是我写错了?
这样不是会导致死锁?

这是我有意为之的,因为如题,今天我们是为了学习AQS而来的。

为了学习效果好,最后将上面的代码复制粘贴到IDEA / eclipse中我们调试一下这段程序。

先看下线程A它在做什么

首先在类的初始化时成员属性lock就被赋值new ReentrantLock();得到的是一个ReentrantLock实例,并且成员属性sync被赋值为非公平锁NonfairSync实例
对应源码

然后当我们让线程start()时,在执行lock.lock()方法时
调用的时ReentrantLock中的lock()

由于我们前面将sync = new NonfairSync();因此本质上调用的是NonfairSync继承父类Sync.lock(),
代码就执行到了

这里面有一个判断,判断中的方法在子类NonfairSync中有实现Sync中定义的abstract方法。
这个时候需要到Sync的子类NonfairSync中找这个方法。如下

final boolean initialTryLock() {
    Thread current = Thread.currentThread();        // 获取当前线程
    if (compareAndSetState(0, 1)) {   // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
        setExclusiveOwnerThread(current);           // 将当前线程设置为独占
        return true;                                // 加锁成功
    } else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
        int c = getState() + 1;                       // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
        if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
            throw new Error("Maximum lock count exceeded");
        setState(c);//更新state值
        return true;
    } else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
        return false;
}

阅读这段代码,会发现第一个判断调用的则是AQS的final方法

// CAS操作对state设置新值,(如果state的值为expect则将state更新为update)
protected final boolean compareAndSetState(int expect, int update) {
    return U.compareAndSetInt(this, STATE, expect, update);
}

而这个U则是封装好的一个支持cpu指令操作的一个工具类,目的就是支持CAS操作

private static final Unsafe U = Unsafe.getUnsafe(); //用的都是最底层的操作,里面有很多关于cas操作的native方法

实际上就是将state值由0,变成1
只是这种操作更安全,因为是原子性的一个操作更新值。如果更新成功则进入第一个代码块
执行 setExclusiveOwnerThread(current);将当前线程设置为独占线程,本质上就是将当前线程存起来,追踪过去又跑到了AQS父类AOS的方法原方法的源码

private transient Thread exclusiveOwnerThread; //成员属性
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread; //给成员属性赋值
}

所以这个方法也就没啥好讲的,意思就是将当前线程设置为独占排他线程。也就是一个象征性的意义没有多大作用,真正控制线程的在后面,我们继续深入。

执行initialTryLock()方法时,根据方法内的3个分支
1.第一次调用lock
2.同一个线程重复调用了lock
3.不同线程使用同一把锁调用了lock

三个分支会导致3种情况
1、2分支则初始化锁成功,返回true
3则会返回false表示初始化锁失败

回到之前Sync中调用initialTryLock()的代码,由于是第一次加锁因此执行的是第一个分支,返回true

下面是我加上注释后的源码

@ReservedStackAccess
final void lock() {

    //initialTryLock()本质上调用的是子类重写的方法例如:NonfairSync和FairSync内的initialTryLock()
    // 只有已经上锁了,当前线程没有获得到锁才会进入if
    if (!initialTryLock()){
        /** * 如果是不同的线程则返回false,就执行if内的 acquire(1); * 又去执行AQS的 public final void acquire(int arg)方法, * 意思就是需要将其加入线程等待队列中(判断中initialTryLock()就已经知道这个线程没有获得到锁返回了false) */
        acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
    }

}

会发现!true返回false则if内的acquire(1);不会执行就返回了。
然后对应我们线程A的lock方法也就执行完了。

new Thread(()->{
    try {
        lock.lock();
        System.out.println("A:这是一段加锁代码段");
    } finally {
        //lock.unlock();
    }
},"A").start();

然后就会执行打印输出语句,在控制台中打印出A:这是一段加锁代码段
由于解锁语句被注释了,因此这个线程内的代码就执行完了。

接下来我们看下线程B它的过程

B和A的逻辑差不多,但线程B如何执行的?
同样会进入 lock,本质调用Sync类中定义的 lock
也会初始化锁执行initialTryLock()但是由于此时的线程是另一个线程我们粘贴一下源码

final boolean initialTryLock() {
    Thread current = Thread.currentThread();        // 获取当前线程
    if (compareAndSetState(0, 1)) {   // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
        setExclusiveOwnerThread(current);           // 将当前线程设置为独占
        return true;                                // 加锁成功
    } else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
        int c = getState() + 1;                        // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
        if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
            throw new Error("Maximum lock count exceeded");
        setState(c);//更新state值
        return true;
    } else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
        return false;
}

会发现此时的state state是AQS类的成员属性 之前被上一个线程更新为了1,因为使用的是同一把锁。
第一个分支不走
又因为第二个判断发现当前线程B,和独占线程A不是同一个线程因此分支2也不走

getExclusiveOwnerThread() // 返回独占线程,来自AQS父类AOS方法

因此最终返回的是false

因此下面 if 中的代码需要执行

@ReservedStackAccess
final void lock() {
    if (!initialTryLock()){
        acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
    }
}

acquire(int);方法是来自非公平锁的父类的父类AQS内部定义的final方法

重点来了,需要集中注意!这里是分水岭

在AQS中源码是这样定义的,下面有两个acquire上面的会调用下面更多参数的acquire方法

下面的代码可以先不看,先拉到后面看我的解说,然后对照着我的解说看代码

/** * 尝试获得到锁,实际调用下面多个参数的acquire()方法 */
public final void acquire(int arg) {
    /** * tryAcquire(arg)调用的是子类实现的tryAcquire(int),本身在AQS则是一个抽象方法 * 需要在子类中查看具体实现,例如:ReentrantLock中的内部类NonfairSync、FairSync中的tryAcquire(int) * 因此需要去子类查看这个方法的实现只有失败了才会进一步调用acquire(null, 1, false, false, false, 0L); */
    if (!tryAcquire(arg)) {
        // 注意参数值除了arg的值是变量,其它都是0会false或null,一遍而言传入的是1,除非一次性加了多次锁
        acquire(null, arg, false, false, false, 0L);
    }
}
/** * 抢占锁的方法,加锁的时候除了arg=1其它都是null或false * * @param node * @param arg 加锁次数 * @param shared 控制是否时共享线程队列也就是SharedNode的布尔值 * @param interruptible 是否时可中断线程 * @param timed 是否由最长等待时间 * @param time 中断超时时间 */
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();    // 获取当前线程
    byte spins = 0, postSpins = 0;              //
    boolean interrupted = false, first = false; // 中断变量值interrupted,first表示第一次进入方法
    Node pred = null;                           //
    // 自旋获取锁
    for (; ; ) {
        // 循环的第一次判断为 true && false (node为null pred=null,null!=null。返回false就不执行后面的赋值和判断)
        // 自旋后的第二次由于是刚创建的则prev为null因此还是false,pred=null,还是false不执行
        // 当node!=null且node.prev!=null说明节点已经入队了,因此第二个判断返回true需要判断!(first = (head == pred))返回的是false
        if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
            // 进入这个代码块说明队列不为空且不止一个线程在等待
            if (pred.status < 0) {  // 实际就是传入的node的前一个节点的status是否<0
                cleanQueue();           // 清空队列
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // 确保序列化
                continue;
            }
        }
        //循环第一次 false || true,需要进入代码块。pred在第一次判断就被赋值为null,循环进入前也是null
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared) {
                    // 循环第一次false进入else分支
                    acquired = (tryAcquireShared(arg) >= 0);
                } else {
                    /** * 尝试抢占锁,如果没有抢到则会自旋,tryAcquire(arg);会一直重复调用,直到抢占成功 * 子类实现的方法。例如非公平锁的tryAcquired(1); * 如果state为0,则acquired则会变成true。将当前线程设置为独占并更新state为arg * 如果state不为0,则acquired=false,说明被其它线程上锁了 * * 自旋的起点是后面 * if (node == null) { * if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null * node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列 * else // 如果是一个排他锁创建一个排他节点 * node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列 * } * 然后到这里的tryAcquire(arg);一直原地踏步,直到抢占到锁 * acquired更新为true * 进入catch后面的if * */
                    acquired = tryAcquire(arg); // 会回到子类(NonfairSync、FairSync等)的实现的方法尝试获得锁,如果失败则还是false,直到成功true
                }
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            // true说明当前线程抢占到锁了
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1; // 返回1标签抢占到锁了这是这个方法的唯一结束点,其它分支始终都会死循环
            }
        }
        /** * AQS队列的形成起点,头节点就是下面的SharedNode或ExclusiveNode,然后自旋 */
        // 第一次进入node传入的是null因此进入
        // 第二次由于第一次进入后node=new SharedNode();或node = new ExclusiveNode();则这个if就不会在进入,会进入第二个if因为pred=null
        if (node == null) {
            if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null
                node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列
            else       // 如果是一个排他锁创建一个排他节点
                node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列
        } else if (pred == null) {          // 尝试将当前线程入队
            node.waiter = current;          // 将当前线程存入ExclusiveNode节点的waiter成员变量
            Node t = tail;                  // 一开始tail=null
            node.setPrevRelaxed(t);         // 将node的prev指向tail,而此时node.next还是null,相当于加入到了队列的末尾,后面需要将队列末尾指向node形成双向的队列
            if (t == null)                  // 第一次队列还没有形成因此t是null需要将头节点初始化
                tryInitializeHead();        // 初始化头节点,并且在内部将tail也执行了这个初始的头节点
            else if (!casTail(t, node))     // casTail(t, node)会将tail变成node
                node.setPrevRelaxed(null);  // 如果tail更新失败。则node.prev=null则又会重新进入if进行更新,直到更新成功。
            else
                t.next = node;              // 将队列的末尾指向node形成双向队列(node.next则是null,也就是说aqs队列的末尾元素的next始终为null,prev会指向前一个节点)
        } else if (first && spins != 0) {   //
            --spins;                        // 让出cpu使用权,减少线程调度得不公平性
            Thread.onSpinWait();            // 让出cpu使用全和Thread.sleep(0)差不多的作用,但是Thread.onSpinWait();更高效,使用的是cpu指令
        } else if (node.status == 0) {      // 0是初始化赋得值,这里aqs队列得节点自然是要要让线程等待,因此更新status值为1(WAITING得值就是常量1)
            node.status = WAITING;          // 如果status为0更新status值为常量WAITING=1,1表示等待
        } else {
            long nanos;
            spins = postSpins = (byte) ((postSpins << 1) | 1);// spins!=0则会调用Thread.onSpinWait();,让当前线程让出cpu的使用权
            if (!timed)//如果没有设置阻塞时间
                LockSupport.park(this);// 阻塞当前线程
            else if ((nanos = time - System.nanoTime()) > 0L)//如果设置了阻塞时长且时间nanos > 0
                LockSupport.parkNanos(this, nanos);  //阻塞nanos纳秒
            else // 如果时间不合法 则break
                break;
            node.clearStatus();// 将status重新更新为0
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);//返回0 或者 返回CANCELLED常量负数
}

会发现执行acquire(1);
内部有一个 if 判断
判断内容是,将acquires的值设置为1

/** * 代码来自非公平锁内部 * 尝试获取锁(加锁)acquires次 */
protected final boolean tryAcquire(int acquires) {
    // 如果加锁线程已经释放了锁,也就是state=0那么就将当前线程设置为独占线程并更新state值为1,表示抢占锁成功
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());// 将当前线程设置为独占线程,表示当前线程获得到锁
        return true;
    }
    return false;//如果已经加过锁了由于state则不为0则会返回false,表示抢占锁失败
}

这里面的逻辑和前面我们分析过的差不多,很明显会返回false,因为state!=0
因为返回false,所以导致内部代码块需要执行也就是

if (!tryAcquire(1)) {
    acquire(null, 1, false, false, false, 0L);
}

然后我们将这些参数值带入去看下多个参数的acquire方法
这个时候拉到上面我粘贴出来的代码,结合我的注释分析一遍流程。
如果我的注释中有错误的地方,可以在评论区给我指出,我会更新文章改正过来。
关于AQS的源码我也有点疑惑,那就是关于这个方法的spins、postSpins这个参数的具体作用是干什么的。从子面量的意思来说分别表示spins和过期的spins,分别用这个两个变量存。

经过这个方法,会形成如下的队列
线程A是持有锁的线程因此不在aqs队列中,而B则因为没有获得锁,就进入了等待队列。
B指我上面的线程B产生的节点,在B之前会创建一个节点,然后利用这个节点作为头,真正的B却是head.next得到的aqs队列实际意义上的队列头。
这么做的目的就是为了方便利用AQS的成员属性快速找到头。相当于始终有一个head指针指向了aqs队列的头,始终有一个tail指针指向队列末尾节点。

通过源码我们很容易看出aqs利用自旋的方式创建了一个CLH队列,然后利用LockSupport.park()将线程进行阻塞

LockSupport源码

而释放锁在我们熟悉了AQS的数据结构后,以及前面的基础后很容易定位到源码
源码依此是
ReentrantLock类

public void unlock() {
    sync.release(1);
}

AQS类

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);//唤醒head.next所指向的节点线程
        return true;
    }
    return false;
}

Sync类

// 尝试释放锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//计算释放后的state值
    if (getExclusiveOwnerThread() != Thread.currentThread())// 如果当前线程不是和独占线程同一个线程抛异常IllegalMonitorStateException
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);// 计算是否要清除独占先(计算释放后是否还有线程持有锁)
    if (free)
        setExclusiveOwnerThread(null);//清除独占线程
    setState(c);//更新state值
    return free;
}
/** * 唤醒线程队列节点中的线程 * 传入的是head,通过head.next得到等待线程的第一个节点将其唤醒 */
private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING); // 更新线程状态值为常量1
        LockSupport.unpark(s.waiter); // 给当前节点的线程发放一个许可,唤醒该线程
    }
}

执行完signalNext后就会将队列第一个节点中的线程唤醒,仔细观察if中的判断,会发现这个方法中的h应该传入的是head的意思,也就是说将真正意义的线程队列第一个节点(实际通过head.next得到)然后通过LockSupport唤醒该节点中的线程

相关文章