并发编程篇:AQS的原理,有这一篇就够了~~

x33g5p2x  于2021-12-18 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(305)

队列同步器AQS,用来构建锁或者其他同步组件的基础框架,它有int类型的 state变量表示同步状态,以及内置的FIFO队列来完成志愿获取线程的排队工作。

基本组成

队列同步器接口
1、同步状态方法
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

AQS提供了3个修改状态的方法:

  • getState(): 获取同步状态
  • setState(int newState) : 设置同步状态
  • compareAndSetState(int expect, int update) :使用CAS设置当前的状态,保证了状态修改的原子性
2、可以重写的方法
方法作用
protected boolean tryAcquire(int arg)这是独占式获取同步状态的方法,该方法的实现需要查询到当前的同步状态,同时做出相应判断,最后再通过CAS设置同步状态
protected boolean tryRelease(int arg)这是独占式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态
protected int tryAcquireShared(int arg)这是共享式获取同步状态的方法,返回的值大于等于0,说明就获取成功了,否则,就是获取失败
protected boolean tryReleaseShared(int arg)这是共享式释放同步状态的方法,让那些等待获取同步状态的线程能够有机会获取同步状态
protected boolean isHeldExclusively()这个方法用来判断当前同步器是否在独占模式下被线程占用,它会取出占用的线程和当前线程做个比较,看下是否相等
3、AQS的模版方法

模版方法分为3类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况

方法作用
public final void acquire(int arg)这个方法是独占式获取同步状态的方法,该方法会调用重写的tryAcquire()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待
public final void acquireInterruptibly(int arg)和acquire()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。
public final void acquireShared(int arg)这是共享式获取同步状态的方法,该方法会调用重写的tryAcquireShared()方法来配合做结果判断,如果当前线程获取同步状态是成功的,该方法就会返回,如果不成功就会进入同步队列等待,不过这个方法支持同一时刻可以有多个线程获取同步状态。
public final void acquireSharedInterruptibly(int arg)和acquireShared()方法类似,不过它多了个响应中断的能力,当前线程未获取到同步状态,一样会进入同步队列,但是如果当前线程被中断,那就会抛出InterruptedException异常并返回
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)在acquireSharedInterruptibly()方法的基础上又加了超时的限制,这样可以让在规定时间内无法获取到同步器状态的线程直接返回false,当然了如果获取到则返回true。
public final boolean release(int arg)这也是独占式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒
public final boolean releaseShared(int arg)这也是共享式的方法,用来释放同步状态,然后将同步队列中的第一个节点包含的线程唤醒
public final Collection getQueuedThreads()获取等待在同步队列上的线程集合

同步队列实现

同步队列依赖内部的同步队列来完成状态的管理,当前线程获取同步队列失败时,同步器会将当前线程以及等待状态等信息构造一个节点(Node)并放入同步队列中,同时会阻塞当前线程,当前状态释放时候,会把首节点中的线程唤醒,使其再次获取同步状态

节点(Node)组成
1、int waitStatus

等待状态:包含如下状态

  1. CANCELLED ,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消 等待,节点进入该状态将不会变化
  2. SIGNAL 值为-1,后继节点的线程处于等待状态,而当前节点的线程若释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
  3. CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
  4. PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件的被传播下去
  5. INITIAL 值为0,初始状态
2、int waitStatus

前驱节点,当节点加入到同步队列时被设置(尾部添加)

3、Node next

后继节点

4、Node nextWaiter

等待队列中的后继节点。若当前节点是共享的,那么这个字段将是一个SHARED常量,也是说节点类别(独占和共享)和等待队列中的后继节点共用同一个字段

5、Thread thread

获取同步状态的线程

队列同步器的基本结构

独占式同步状态获取与释放

1、获取独占式同步状态

A、主要过程

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如代码所示:

  1. tryAcquire(arg),线程安全获取同步状态;获取成功返回
  2. 如果获取失败,构造同步节点,,通过addWaiter加入同步队列的尾部,最后调用acquireQueued,以死循环的方式获取同步状态

B、addWaiter方法:节点构造并加入同步队列中,构造的节点是独占式的(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
 }

C、acquireQueued:节点进入同步队列中自旋,每个节点都在检查,当条件满足了,获取同步状态,就可以自旋过程退出,否则留在字段过程中,并阻塞节点的线程

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
 }

D、acquire流程图如下

2、释放独占式同步状态

当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,时候去节点获取同步状态

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // 找到第一个节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享式同步状态获取与释放

1、共享式同步状态获取

主要过程:
A、调用tryAcquireShared尝试获取同步状态、如果返回值>0,则获取成功
B、否则,在自旋过程中,如果是头节点,并尝试同步状态成功,如果获取状态>0,唤醒后继节点,并退出自旋

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
     private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
2、共享式同步状态释放

释放同步状态,并唤醒后继节点

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

独占式超时获取同步状态

获取独占超时同步状态

doAcquireNanos(int arg, long nanosTimeout)可以超时获取同步状态,也就是在固定时间段内获取同步状态,如果获取成功则返回true;发否则返回fase

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

独占式获取同步状态流程图

欢迎关注公众号"程序员ZZ的源码",一起学习

相关文章