Jdk源码分析

文章40 |   阅读 10697 |   点赞0

来源:https://yumbo.blog.csdn.net/category_10384063.html

并发工具类 java.util.concurrent.Phaser 移相器 你是否熟练掌握了呢?

x33g5p2x  于2021-12-18 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(205)

下面这张图,看完文章后再看,主要意图在于再次看到文章后能快速回忆

state值的说明

  1. 未到达者(bits 0-15)
  2. 参与者(bits 16-31)
  3. 栅栏(bits 32-62)
  4. 中断标志,也是符号位(bit 63 / sign)

一、Phaser 基本使用—初步认识

能看到这篇文章的,我相信 juc 也有一定的掌握了,如果不是这样的,可以看下我的专栏文章

  1. 《JUC (java.util.concurrent)》
  2. 《jdk源码》

PhaserCyclicBarrier 功能类似。

Phaser主要的应用场景是:需要n个线程同时到达后才可以继续执行,否则就停留在原地等待。也就是phaser.arriveAndAwaitAdvance()方法,自己到达后还需要等待其它人到达。

其它n-1个线程是怎么注册进入Phaser的呢?会不会像CyclicBarrier、CountDownLatch、Semaphore在构造的适合传入数量n呢?

1、注册参与者的方式
1.1、构造方法进行注册参与者

通过观察其构造方法,会发现Phaser提供了4种构造方法
如下:

通过观察第2个构造方法,传入一个 int 值非常的类似于CyclicBarrier、CountDownLatch、Semaphore的用法。

1.2、public 方法进行注册参与者

与此同时无参的构造方法也透露出其不一定只能通过构造方法传入参与者数量。
可以通过提供的

  1. register() 注册单个参与者
  2. bulkRegister(int parties) 批量注册多个参与者

此外,Phaser可以动态的变化参与者数,比CyclicBarrier还要强大哦,如果不变就相当于CyclicBarrier

2、一个完整的使用案例
import java.util.concurrent.Phaser;

public class PhaserDemo {

    public static void main(String[] args) {
        // 创建一个相位器
        Phaser phaser = new Phaser();
        // 模拟10个线程的栅格阻塞
        for (int i = 0; i < 10; i++) {
            phaser.register();                  // 注册1个参与者
            new Thread(() -> {
                // 用于表示线程在执行
                System.out.println(Thread.currentThread().getName() + "已到达===正在等待其它线程");
                if (Thread.currentThread().getName().equals("Thread-1")) {
                    try {
                        // 特意将其中Thread-1线程进行休眠5秒,让其它线程先执行下面的 arriveAndAwaitAdvance 方法
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                int num = phaser.arriveAndAwaitAdvance();     // 等待其它参与者到达

                System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + num + "");
            }, "Thread-" + i).start();
        }

    }
}

等待5秒后,所有线程都到达,然后一起从arriveAndAwaitAdvance处继续执行。如下gif

代码逻辑分析

我们通过for创建了10个线程,创建线程前,注册一个参与者。那么我们注册了10个参与者。

现在有10个线程在执行任务,任务如下

Thread-1需要进休眠5秒,其它9个一路执行下去,执行到phaser.arriveAndAwaitAdvance();,我们看下是不是在参与者没有到齐的情况下是不是都进行了阻塞等待。

通过上面录制的gif,可以发现的确是这样的,9个线程到达了并且都在等待Thread-1到达,而Thread-1需要5秒后才执行到phaser.arriveAndAwaitAdvance();,执行完后才表示到达,然后大家一起从阻塞中被唤醒,继续执行后面的逻辑代码。

额外补充 - - - arrivewait 相关的其它几个方法

上述案例中使用的是arriveAndAwaitAdvance();你是否有发现,拥有同样功能的另外几个方法,你知道它们的区别吗?–实际上通过名称和参数也大致就知道怎么用。

解释通过注释即可理解,其中arriveAndDeregister这个方法让Phaser拥有动态修改参与者的数量。
也是比CyclicBarrier更强大的原因

二、Phaser—深挖底层的实现原理

对于大部分人来说,上面的内容已经够用了,但是作为一个喜欢专研的人说,只有知道其底层的实现原理,才可以真正掌握Phaser的用法,以至于在日后的编码中出现问题后能迅速的找到原因并改正。

即,我们需要明白Phaser是如何实现动态修改参与者数量的,也即arriveAndDeregister的底层原理,
如何用于表示上诉描述的信息中的一些关系:参与者、到达者数量
通过观察源码,底层有一个long state变量,怎么表示注册一个参与者取消一个参与者到达一个

另外是否有注意到其底层中有两个队列?evenQ 和 oddQ是干什么用的呢?
这些问题都是我们接下来需要研究的问题。

跟着文章继续走,一起研究一下。
它们是做什么用的:

1、研究state值
1.1、初始值state是多少

假设我们使用前面的测试代码,通过无参构造,看下state值是多少,加一个断点观察一下,会发现state=1

而这个state=1,则是由内部的静态常量EMPTY得来
具体的调用关系是 空参构造调用全参构造即

得到一个信息:初始没有参与者的情况下state=1

得到第二个信息:注册一个参与者的情况下:state=65537
探究一些register做了什么

这里特意将for循环中一些可以帮助我们理解其底层的一些步骤提出来

long s = (parent == null) ? state : reconcileState();// 得到state值,注意是long类型的
int counts = (int)s;// 强转为int,高位的32位被抛弃,因此操作底层很多都是
/* PARTIES_SHIFT=16, 也就是说:参与者parties的数量是高位的前16位,也就是65535个最多容纳65535个参与者*/
int parties = counts >>> PARTIES_SHIFT;
/* UNARRIVED_MASK = 0xffff; 未到达者的数量=后面16位数字 */
int unarrived = counts & UNARRIVED_MASK;

得到第三个信息:state的64位后面32位是真正用到的,而32位前面16位用于表示参与者数量,后面16位用于表示未到达的数量

这里是不是会产生一个问题,初始值state不应该是0?为什么设置为1呢?根据第三个信息,0才是正确的表示啊!

1.3、arrive 的职责

按照第三个信息state=1的含义就是,参与者为0,未到达者为1。
如果不考虑参与者数量,那么未到达者为1是否会出现问题呢,比如我调用一下arrive,实际上会报错因为参与者part为0

前面我们调试过了,注册第一个参与者后,state值变成了65537,也就是1<<16+1表示
那么接着注册一个呢?

会发现变成了131074,也就是 1<<16+1 再加上了 1<<16+1 等同于1<<17 + 2
如上计算器中显示的二进制位
这里,我们明白了register的作用就是每增加一个参与者,对应的到达者应该也要相同(在还没有执行arrive相关的方法前)

看下arrive做了什么,会发现传入了一个1

那么经过一次减后就变成了65536,也就是1 0000 0000 0000 0000
这个时候发现未到达数为0,说明已经到齐了,那么后面是不是应该通知其它正在等待的方法结束等待?

继续看下这个方法

这里最前面的1表示阶段数,每次所有参与者到达时就会进入下一个阶段,这个值加一

得到第四个信息:每次调用arrive会导致未到达数-1
得到第五个信息,高位的1,也就是第33位前面圈起来的那位,表示已经有参与者到达了

探究phaser.awaitAdvance(int phase);的用法

这里需要传入一个int,你知道它的用途吗?根据场景我们该传多少合适呢?

一起来调试一下下面的代码,我们的意图是让所有线程都进行阻塞,不让其继续执行

传入0测试一下
import java.util.concurrent.Phaser;

public class PhaserDemo {

    public static void main(String[] args) {
        // 创建一个相位器
        Phaser phaser = new Phaser();
        // 模拟10个线程的栅格阻塞
        for (int i = 0; i < 10; i++) {
            phaser.register();                  // 注册1个参与者
// phaser.arrive();
// phaser.awaitAdvance(1);
            new Thread(() -> {
                // 用于表示线程在执行
                System.out.println(Thread.currentThread().getName() + "已到达===正在等待其它线程");
                int num = phaser.awaitAdvance(0);     // 等待其它参与者到达

                System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + num + "");
            }, "Thread-" + i).start();
        }

    }
}

会发现传入0后,10个线程都进行了阻塞

如果将传入的值改成1呢

传入1测试一下

会发现并没有起到阻塞的作用,一下子就执行完了,为什么呢?

探讨传入1时awaitAdvance(1) 的作用

代码上容易发现,当我们注册了参与者后,假设是65537,当我们调用awaitAdvance(1)时,执行也就执行到了蓝色的地方,会发现p=0,而我们传入phase=1 显然不等,就直接return了

那么我们传入的phase=1有什么含义呢?

经过实验发现,这里的含义是,当有一个参与者到达后进行阻塞,如果传入2,那么含义就是当有2个参与者arrive到达后就会进行阻塞。

以次类推,这个方法就是当有n个参与者到达后才进行阻塞,否则就继续执行。

phaser.register();                  // 注册1个参与者
phaser.arrive();
phaser.awaitAdvance(1);

个人觉得吧,一般的用法都是传入0,因为出于控制的原因,使其进行阻塞,当某个任务完成后,即获取到某些资源后我才继续执行下去。
当然也不排除只有在获取对应的资源后才进行阻塞的场景,这种场景也是存在的。比如接收到电子信号后就让机器停止作业。

如果我们想把Phaser当作CyclicBarrier用,那么需要控制多少个线程就调用多少个register()方法即可

例如下面是一个注册了3个的参与者,但是只有2个线程到达,会导致Thread-Listener会阻塞等待都到齐

import java.util.concurrent.Phaser;

public class PhaserDemo {

    public static void main(String[] args) {
        // 创建一个相位器
        Phaser phaser = new Phaser();
        phaser.register();                  // 注册1个参与者
        phaser.register();                  // 注册1个参与者
        phaser.register();                  // 注册1个参与者
        new Thread(()->{
            final int i = phaser.awaitAdvance(0);
            System.out.println("都到齐了,"+Thread.currentThread().getName()+" 执行任务...");
        },"Thread-Listener").start();
        // 模拟10个线程的栅格阻塞
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                // 用于表示线程在执行
                System.out.println(Thread.currentThread().getName() + "已到达===正在等待其它线程");
                int num = phaser.arrive();     // 等待其它参与者到达
            }, "Thread-" + i).start();
        }

    }
}

正常的都到齐的情况下,执行结果如下

三、Phaser 的难点:底层数据结构

1、探究Phaser使用分层时,其底层的数据结构

乍一看,感觉和CyclicBarrier 差不多的功能。
但是试想一下,如果这里的任务有很多,也就是参与者非常多的情况下比如100个线程,会发现如果使用CyclicBarrier的话就意味着这100个线程同时用一个对象,那么显然性能是急剧下降的,而Phaser就是为了解决这个问题的,它的底层为了避免出现很多个线程同时使用CyclicBarrier这种场景,将其形成了一个树形结构

也就是前面我们一直没有讲到的结构

从上面这张图片我们知道每一个Phaser实例都有一个 root ,用于指向root,而parent用于形成单向链表。
evenQ 和 oddQ这是两个栈,根据源码中的注释信息得知这两个的区别是分表是phase栅栏(bits 32-62,从0开始算,也就是0-63)是奇数还是偶数将Phaser分表存入其中。

注释中提到所有的 Phaser 共享 这两个队列

根据源码中的注释以及阅读过程中的推理,推测结构

是不是我们推测的结构呢?

通过源码的阅读,我们发现构造这个结构的地方就是这个全参构造。

请仔细阅读这里的代码,证实我们的推论,上述的数据结构可能错误哦,只是猜测而已。

要形成上述结构,那么肯定先有root,也就是else先执行,后再通过构造的时候会传入Phaser parent
仔细阅读if分支的 // 2.内容。
因为root前面创建好了evenQ 、oddQ 会发现这个节点是引用了root.evenQ 、root.oddQ因此推论:所有的Phaser共用同一个evenQ 、oddQ正确,并且由root创建的。此时我们大致证实了上面的数据结构。

继续深入,这里的内部类QNode底层我们也需要好好的研究一下,看下其作用

public Phaser(Phaser parent, int parties) {
    if (parties >>> PARTIES_SHIFT != 0) throw new IllegalArgumentException("Illegal number of parties");
    int phase = 0;
    this.parent = parent;
    
    if (parent != null) {
        // 2.构造root后面的结构,所有的phaser都直接引用root
        final Phaser root = parent.root;// 得到root
        this.root = root;// 将当前phaser的root更新为传来的phaser.root
        this.evenQ = root.evenQ;// 将root的栈也放到当前的phaser
        this.oddQ = root.oddQ;// 将root的奇数栈也放到这里
        if (parties != 0)
            phase = parent.doRegister(1);
    } else {
        // 1.说明这是一个root节点
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();// 结构从这里开始,偶数队列(栈)
        this.oddQ = new AtomicReference<QNode>();// 奇数队列(栈)
    }
    
    this.state = (parties == 0) ? (long) EMPTY :
            ((long) phase << PHASE_SHIFT) |
                    ((long) parties << PARTIES_SHIFT) |
                    ((long) parties);
}
2、探究QNode节点的底层原理

仔细观察QNode中的数据结构,关键性的信息

  1. 线程
  2. Phaser 相位器
  3. 阶段phase
  4. 中断
  5. 超时
  6. 是否已经被中断
  7. 终结点
  8. 下一个QNode

nanos对于我们理解,没啥作用

会发现底层使用 LockSupport 进行阻塞 和 唤醒

/** * 分层队列底层 */
static final class QNode implements ForkJoinPool.ManagedBlocker {
    volatile Thread thread;     // 对应的线程
    final Phaser phaser;        // 节点Phaser
    final int phase;            // 阶段
    final boolean interruptible;// 是否可以 被中断
    final boolean timed;        // 是否已超时
    boolean wasInterrupted;     // 是否已经被中断
    long nanos;                 // 纳秒
    final long deadline;        // 终结点
    QNode next;                 // 下一个Node

    QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        thread = Thread.currentThread();
    }

    // 是否是可释放的,true表示不可以释放
    public boolean isReleasable() {
        if (thread == null)
            return true;
        if (phaser.getPhase() != phase) {
            thread = null;
            return true;
        }
        if (Thread.interrupted())
            wasInterrupted = true;
        if (wasInterrupted && interruptible) {
            thread = null;
            return true;
        }
        if (timed && (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
            thread = null;
            return true;
        }
        return false;
    }

    // 如果超时了,则阻塞线程
    public boolean block() {
        //
        while (!isReleasable()) {
            if (timed)
                LockSupport.parkNanos(this, nanos);
            else
                LockSupport.park(this);
        }
        return true;
    }
}

那么QNode为什么要这么设计呢?
在实际开发过程中,我们该怎么使用分层来提高我们对于程序的并发量?

为什么要设计分层呢?

这里为了更好的理解,我们看下调用QNode构造方法的地方有那些

会发现有3处地方调用了这个构造方法。
第一个和第二个区别在于第4个参数timed不同,显然肯定是这个参数用于设置超时时间的。
然后分别点过去看下是在那个方法的,会发现都是在相关的await方法中

实际上我们这里是想知道evenQ和oddQ它们的区别,只有观察它们的形成,我们才知道这两个队列的区别。

经过追踪源码在internalAwaitAdvance 方法中找到了它们的区别
根据phase是奇数还是偶数区分,然后使用的是头插法

这两个队列本质上没有太多区别,是两个完全相同的含义,处于这样设计的原因可能是为了提高效率

回到前面的问题,Phaser的设计主要是为了解决线程数量非常多的情况下进行通过空间换时间的策略来实现并发。假设有1w个线程共同使用通过一个Phaser或者CyclieBarrier那么会发现效率很低,而Phaser提供了分层的思想去解决多个线程访问同一个阻塞器。有点像 ThreadLocal

对于分层的使用,实际上开发中很少用,首先了解Phaser的人很少,加之应用场景也少,相比使用来说,CyclicBarrier用起来更简单。

如果想要关注分层的代码该怎么写,那么源码中的注释中有案例
Task是一个实现了Runnable接口的自定义类,可以自己写一个。
其大致的作用就是构造前面那张截图中的数据结构

void build(Task[] tasks, int lo, int hi, Phaser ph) {
  if (hi - lo > TASKS_PER_PHASER) {
    for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
      int j = Math.min(i + TASKS_PER_PHASER, hi);
      build(tasks, i, j, new Phaser(ph));
    }
  } else {
    for (int i = lo; i < hi; ++i)
      tasks[i] = new Task(ph);
      // assumes new Task(ph) performs ph.register()
  }
}

相关文章