JDK并发工具类源码学习系列——ConcurrentLinkedQueue

x33g5p2x  于2021-12-18 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(270)

欢迎阅读原文:JDK并发工具类源码学习系列目录

上一篇文章介绍了JDK java.util.concurrent包下很重要的一个类:ConcurrentHashMap,今天来看下另一个重要的类——ConcurrentLinkedQueue。
在多线程编程环境下并发安全队列是不可或缺的一个重要工具类,为了实现并发安全可以有两种方式:一种是阻塞式的,例如:LinkedBlockingQueue;另一种即是我们将要探讨的非阻塞式,例如:ConcurrentLinkedQueue。相比较于阻塞式,非阻塞的最显著的优点就是性能,非阻塞式算法使用CAS来原子性的更新数据,避免了加锁的时间,同时也保证了数据的一致性。

简单介绍

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息可以参见参考资料一

结构预览

首先看看结构图:

图1:ConcurrentLinkedQueue结构图:

从图中可以看到ConcurrentLinkedQueue中包含两个内部类:Node<E>和Itr。Node<E>用来表示ConcurrentLinkedQueue链表中的一个节点,通过Node<E>的next字段指向下一个节点,从而形成一个链表结构;Itr实现Iterator<E>接口,用来遍历ConcurrentLinkedQueue。ConcurrentLinkedQueue中的方法不多,其中最主要的两个方法是:offer(E)和poll(),分别实现队列的两个重要的操作:入队和出队。

方法含义
offer(E)插入一个元素到队列尾部
poll()从队列头部取出一个元素
add(E)同offer(E)
peek()获取头部元素,但不删除
isEmpty()判断队列是否为空
size()获取队列长度(元素个数)
contains(Object)判断队列是否包含指定元素
remove(Object)删除队列中指定元素
toArray(T[])将队列的元素复制到一个数组
iterator()返回一个可遍历该队列的迭代器

下面会着重分析offer(E)和poll()两个方法,同时会讲解remove(Object)和iterator()方法。

常用方法解读
入队——offer

首先看看入队操作,由于是无阻塞的队列,所以整个入队操作是在无锁模式下进行的,下面来分析下JDK到底是如何实现无锁并保证安全性的。

/** * Inserts the specified element at the tail of this queue. * * @return <tt>true</tt> (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e, null);
    for (;;) {//①
        Node<E> t = tail;//②
        Node<E> s = t.getNext();//②
        if (t == tail) {//③
            if (s == null) {//④
                if (t.casNext(s, n)) {//⑥
                    casTail(t, n);//⑦
                    return true;
                }
            } else {
                casTail(t, s);//⑤
            }
        }
    }
}

代码不长,但是思路还是很巧妙的,下面我们逐句深入分析每一行代码。if (e == null) throw new NullPointerException(); Node n = new Node(e, null);检查NULL,避免NullPointerException,然后创建一个Node,该Node的item为传入的参数e,next为NULL。for (;;) {}接着是一个死循环,死循环保证该入队操作能够一直重试直至入队成功。Node t = tail; Node s = t.getNext();使用局部变量t引用tail节点,同时获取tail节点的next节点,赋予变量s。if (t == tail) {}只有在t==tail的情况下才会执行入队操作,否则进行下一轮循环,直到t==tail,因为是无锁模式,所以如果同时有多个线程在执行入队操作,那么在一个线程读取了tail之后,很可能会有其他线程已经修改了tail(此处的修改是指将tail指向另一个节点,所以t还引用着原来的节点,导致t!=tail,而并非是修改了tail所指向的节点的值),此处的判断避免了一开始的错误,但是并不能保证后续的执行过程中不会插入其他线程的操作,其实ConcurrentLinkedQueue的设计使得if内的代码即使在有其他线程插入的情况下依旧能够很好地执行,下面我们接着分析。

if (s == null) {} else { casTail(t, s); }这里判断s(tail的next是否为NULL),如果不为NULL,则直接将tail指向s。这里需要说明一下:由于tail指向的是队列的尾部,所以tail的next应该始终是NULL,那么当发生tail的next不为NULL,则说明当前队列处于不一致状态,这时当前线程需要帮助队列进入一致性状态,这就是ConcurrentLinkedQueue设计的巧妙之处!那么如果帮助队列进入一致性状态呢?这个问题我们先留着,继续看什么情况下会导致队列进入不一致状态!

if (t.casNext(s, n)) {
    casTail(t, n);
    return true;
}

这几句代码完成了入队的操作,第一步CAS的设置t(指向tail)的next为n(新创建的节点),该更新操作能够完成的前提是t的next值==s,即tail的next值在该线程首次读取期间并未发生变化。此处的CAS操作保证了tail的next值更新的原子性,所以不会出现不一致情况。当成功更新了tail的next节点之后,接下来就是原子性的更新tail为n,此处如果更新成功,则入队顺利完成完成,但是奇怪的是如果此处更新失败,入队依旧是成功的!为什么呢?看下文。

我们试想如果一个线程成功的原子性更新了tail的next值为新创建的节点,由于Node的next是volatile修饰的,所以会立即被之后的所有线程可见,那么就会出现tail未变化但是tail的next已经不是NULL了,此时就会出现上面提到的tail的next不为NULL的情况了,现在我们再来看看上面是如何处理这种情况的,casTail(t, s);,从这句可以看出当一个线程看到tail的next不为NULL时就会直接将tail更新成s(tail的next所指向的节点),即将tail指向其next节点,当然这里的更新也是CAS保证的原子性更新。为什么敢这么大胆,正是因为如果当前线程(T1)看到tail的next不为NULL,那么必然是有一个线程(T2)处于入队操作中,且成功执行了t.casNext(s, n)(将新创建的节点赋到tail的next上),正准备执行casTail(t, n);(将tail执行其next指向的节点),那么T1直接将T2准备做的工作完成,然后再进入循环重新进行入队操作,而T2也不在乎自己这一步是否顺利完成,反正只要有人完成了就行,所以T2就直接返回入队成功,最终T1帮助T2顺利完成了入队操作,并且全程无锁,此设计真的是巧妙啊~~~

下面我们使用流程图形象的描绘下入队过程,整个入队方法被划分成7步(见上面的代码中的注释)。说明:虽然入队是在无锁模式下进行,但是由于使用CAS进行原子性更新,所以很多地方其实还是实现了线程安全的,除了⑥->⑦,下面的图描绘的也正是⑥->⑦这一步可能出现的冲突情况。

图2:ConcurrentLinkedQueue入队流程图:

上面介绍了ConcurrentLinkedQueue是如何实现无锁入队的,但是我们只说明了多个线程同时入队操作是线程安全的,但是如果多个线程同时进行入队和出队,以及删除操作呢?这个问题在下面分析另外两个方法时会提到,同时最后也会进行一个总结,下面我们先看看删除操作是如何实现的。

删除——remove

先介绍删除,是因为出队操作有个地方需要在这里提前介绍下。

public boolean remove(Object o) {
    if (o == null) return false;// ①
    for (Node<E> p = first(); p != null; p = p.getNext()) {// ②
        E item = p.getItem();// ③
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null))// ④
            return true;
    }
    return false;
}

源码中的注释申明了remove方法会使用equals()判断两个节点的值与待删除的值是否相同,同时如果队列有多个与待删除值相同的节点则只删除最前面的一个节点。

同样remove()方法也是无锁模式,①判断是否为NULL,②从队列头部开始查找,③获取每个节点的item值,用于跟o进行equals比较,前面三步都很平常,重点在④,if (item != null && o.equals(item) && p.casItem(item, null))这里首先判断item不为NULL,然后判断item与o相等,前面两个都满足的话,那说明已经查找到一个节点的值与待删除的值一样,后面就是删除该节点,这里删除其实并非真的删除,而只是原子性的将节点的item值设置为NULL。从上面的分析可以看出ConcurrentLinkedQueue的删除只是将队列中的某个节点值置为NULL,由于Node的item是volatile的,所以不存在线程安全问题,同时由于remove并未修改队列的结构,所以多个线程同时进行remove,或者同其他方法一起进行也不会发生线程安全性问题。

出队——poll

出队从逻辑上来说就是从队列的头部往外取出数据并删除,下面看看ConcurrentLinkedQueue是如何实现无锁出队的。

public E poll() {
    for (;;) {// ①
        Node<E> h = head;// ②
        Node<E> t = tail;// ②
        Node<E> first = h.getNext();// ②
        if (h == head) {// ③
            if (h == t) {// ④
                if (first == null)// ⑤
                    return null;
                else
                    casTail(t, first);// ⑥
            } else if (casHead(h, first)) {// ⑦
                E item = first.getItem();// ⑧
                if (item != null) {// ⑨
                    first.setItem(null);// ⑩
                    return item;
                }
                // else skip over deleted item, continue loop,
            }
        }
    }
}

出队的步骤略多些,不过理解了也就很简单了。首先①是一个死循环;②的三步分别是获取head/tail/head.next三个节点;③判断h==head,避免操作过程中已有其他线程移动了head;④判断head是否等于tail,即队列是否为NULL,说到这里我们先来看看head和tail在队列中到底处于什么位置。我们用一个队列入队出队的时序图来描绘下在入队和出队过程中head和tail到底是如何变化的。

图3:ConcurrentLinkedQueue队列时序图:

从图中我们可以看出head的next指向的是队列的第一个元素,我们出队也是将head的next指向的元素出队,同时head==tail说明队列已经没有元素了。明白了这两点我们再接着④分析,如果④这里为真,说明队列已经为NULL,接着⑤判断f(head的next指向的节点)是否为NULL,不为NULL则执行⑥将tail指向f,到这里如果理解了上面入队操作,那么应该是可以理解这一步的用意的——帮助其他线程执行入队操作,跟入队时的⑤是一样的,因为head==tail,head的next不为NULL,则说明tail的next不为NULL,所以要将tail重新指向他的next,帮助正在执行入队的线程完成入队工作。理解了这一步那么出队操作就已经理解了一大半了,下面继续看⑦⑧⑨⑩。

如果head!=tail,则队列不为NULL,那么直接将head指向下一个节点,将当前节点踢出队列即可,当然需要CAS保证原子性更新,然后将踢出队列的节点的item取出返回,并置为NULL即完成了出队操作。这里需要注意的是如果被踢出队列的节点的item是NULL,说明该节点已经被删除了(因为remove()方法只是将节点的item设置为NULL,而不将节点踢出队列),那就只能再次循环了。再提一点,为什么⑦⑧⑨⑩能够被线程安全的执行,因为在⑦这一步是原子更新的,而且更新之后这个节点就立即不会被其他任何线程访问到了,所以后面⑧⑨⑩想怎么处理都是安全的。

到这里出队操作应该很清楚了,下面就来综合分析下为什么针对ConcurrentLinkedQueue的整个入队/出队/删除都是不需要锁的。

  1. 上面已经分析了如果多个线程同时访问其中任一个方法(offer/poll/remove)都是无需加锁而且线程安全的
  2. 由于remove方法不修改ConcurrentLinkedQueue的结构,所以跟其他两个方法都不会有冲突
  3. 如果同时两个线程,一个入队,一个出队,在队列不为NULL的情况下是不是有任何问题的,因为一个操作tail,一个操作head,完全不相关。但是如果队列为NULL时还是会发生冲突的,因为tail==head。这里我们在分析出队时也提到了,如果出队线程发现tail的next不为NULL,那么就会感知到当前有一个线程在执行入队操作,所以出队线程就会帮助入队线程完成入队操作,而且每个操作都是通过CAS保证原子性更新,所以就算同时两个线程,一个入队,一个出队也不会发生冲突。

综上,ConcurrentLinkedQueue最终实现了无锁队列。

使用场景

ConcurrentLinkedQueue适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的ConcurrentLinkedQueue来替代。下面我们来简单对比下ConcurrentLinkedQueue与我们常用的阻塞队列LinkedBlockingQueue的性能。
表1:入队性能对比

线程数ConcurrentLinkedQueue耗时(ms)LinkedBlockingQueue耗时(ms)
52229
105059
2099112
30139171

测试数据:N个线程,每个线程入队10000个元素。

参考文章

聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
非阻塞算法在并发容器中的实现

欢迎访问我的个人博客~~~

相关文章