并发编程系列之什么是并发协同?
多个线程并发,协作来完成一件任务的过程。因为任务处理的需要,需控制某些线程等待另外一些线程执行完成任务的某些部分,然后继续执行。
jdk的juc包中除提供了用于专门处理1并发协同的工具类,主要有CountDownLatch、CyclicBarrier、Phaser、Semaphore
CountDownLatch倒计数锁存器
用途:用于协同控制一个或多个线程等待在其他线程中执行的一组操作完成,然后再继续执行
CountDownLatch用法
构造方法:CountDownLatch(int count),count指定等待的条件数(任务数、操作数),不可再更改
等待方法:await(),阻塞等待线程直到count减少为0,count为0时,不会阻塞,继续执行
boolean await(long timeout,TimeUnit unit):可以设置超时时间的await方法,返回true表示等待条件到达;false表示条件未来到达,但超时了
long getCount():获取当前计数值,常用于调试或者测试
CountDownLatch适用场景
countDownLatch(N)
这个多个条件可以是:等待N个线程、等待N个操作、等待某操作的N次执行例子:等待n个线程执行完成,再一起执行
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
final CountDownLatch cdl = new CountDownLatch(1);
int concurrency = 100;
final Random random = new Random();
for (int i = 0; i < concurrency; i++) {
new Thread(()->{
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "准备就绪");
// 让并发线程都等待发出信号
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "开始工作");
}).start();
}
System.out.println("******************** 发出开始信号***********");
cdl.countDown();
}
}
执行,发现结果不符合我们的要求,虽然也是多个线程等待,再一起无序执行:
******************** 发出开始信号***********
Thread-22准备就绪
Thread-22开始工作
Thread-45准备就绪
Thread-45开始工作
...
因为CountDownLatch不能重用,所以再新加一个CountDownLatch协同N个线程:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class StartTogerCountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
int concurrency = 100;
final CountDownLatch cdln = new CountDownLatch(concurrency);
final Random random = new Random();
for (int i = 0;i < concurrency; i++) {
new Thread(()->{
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 准备就绪");
// 调用countDown()报告完成任务
cdln.countDown();
// 让所有线程都等待发出信号
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 开始工作");
}).start();
}
//等待准备完成
cdln.await();
System.out.println("******************** 发出开始信号***********");
cdl.countDown();
}
}
等待N个线程准备就绪,然后一个总的CountDownLatch发出信号量,所有线程一起执行
...
Thread-11 准备就绪
Thread-14 准备就绪
Thread-53 准备就绪
Thread-91 准备就绪
******************** 发出开始信号***********
Thread-97 开始工作
Thread-57 开始工作
...
CyclicBarrier循环屏障定义
定义:协同指定数目的线程,让这些线程都在这个屏障前等待,直到所有的线程都到这个屏障前,再一起继续执行。线程执行完成后,这个屏障可以再次使用,因此被称之为循环屏障。
CyclicBarrier用法以及原理
构造方法,CyclicBarrier(int parties)
:parties指定有多少个部分(线程)参与,称之为参与数。
构造方法,CyclicBarrier(int parties,Runnable barrierAction)
:barrierAction,所有参与者都到达屏障时执行一次的命令。在一组线程中最后一个线程到达之后(但在释放所有线程之前),在该线程中执行改命令,该命令只在每个屏障点运行一次。若要在继续执行所有线程之前更新共享状态,此屏障操作很有用。
int await() throws InterruptedException,BrowkenBarrierException
:线程执行过程会调用await()方法,表明自己已经到达屏障,该线程自己阻塞,等待其它线程也到达屏障;当所有线程都到达屏障,也即线程等待数等于参与数,则释放所有线程,让它们继续执行。返回值int表示到达当前线程的索引号,注意索引号是从parties-1
开始减为0。BrokenBarrierException
,屏障被破坏异常,当调用await时,或等待过程中屏障被破坏,则会抛出BrokenBarrierException
。
int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException
:等待指定时长,如到了时间还不能释放,则将抛出TimeoutException
int getNumberWaiting()
: 获取当前在屏障处的线程数
boolean isBroken()
: 判断屏障是否被破坏
void reset()
:重置屏障为初始化状态。如果当前有线程正在等待,则这些线程将被释放并抛出BrokenBarrierException
CyclicBarrier使用注意事项
一定要确保有足够多的参与者线程,否则会一直阻塞在屏障处。
在线程池中使用要注意,确保线程池的线程数大于等于参与数。
CyclicBarrier适用场景
线程等待一起执行
多次等待一起执行
CountDownLatch和CyclicBarrier对比
CountDownLatch是一部分线程等待另外一部分线程来唤醒
CyclicBarrier是参与线程彼此等待,都到达了,再一起执行
CountDownLatch不可以循环引用,CyclicBarrier可以循环使用
场景:多阶段等待一起出发
参与者不变,多次彼此等待。正好可用CyclicBarrier的循环使用特性
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int concurrency = 100;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrency , ()->{
System.out.println("*****************准备完成!************");
});
final Random random = new Random();
for (int i = 0 ; i < concurrency; i++) {
new Thread(() -> {
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "准备就绪");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 开始工作....");
}).start();
}
}
}
控制台打印:
...
Thread-12准备就绪
Thread-58准备就绪
Thread-75准备就绪
Thread-25准备就绪
*****************准备完成!************
Thread-25 开始工作....
Thread-89 开始工作....
Thread-34 开始工作....
...
jdk7中增加了一个用于多阶段同步控制的工具类,它包含了CyclicBarrier和CountDownLatch的相关功能,比它们更强大灵活。
对Phaser阶段协同器的理解,Phaser适用于多个线程协作的任务,分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与某个阶段;当一个阶段中所有任务都成功完成后,Phaser的onAdvance()被调用,然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
Phaser API说明:
构造方法
Phaser()
:参与任务数0
Phaser(int parties)
:指定初始参与任务数
Phaser(Phaser parent)
:指定parent阶段器, 子对象作为一个整体加入parent对象,当子对象中没有参与者时,会自动从parent对象解除注册
Phaser(Phaser parent , int parties)
:集成上面两个方法的
增减参与任务数方法
int register()
:增加一个数,返回当前阶段号
int bulkRegister(int parties)
:增加指定个数,返回当前阶段号
int arriveAndDeregister()
:减少一个任务数,返回当前阶段号
到达等待方法
int arrive()
:到达,任务完成,返回当前阶段号
int arriveAndAwaitAdvance()
:到达后等待其他任务到达,返回到达阶段号
int awaitAdvance(int phase)
:在指定阶段等待(必须是当前阶段才有效)
int awaitAdvanceInterruptibly(int phase)
int awaitAdvanceInterruptibly(int phase , long timeout, TimeUnit unit)
阶段到达触发动作
protected boolean onAdvance(int Phase , int registeredParties)
:类似于CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作
其它api
void forceTermination()
:强制结束
boolean isTerMinated()
:判断是否结束
void getPhase()
:获取当前阶段号
注意事项 : 单个Phaser实例允许的注册任务数的上限是65535,如果参与任务数超过,可以用父子Phaser树的方式
import java.util.Random;
import java.util.concurrent.Phaser;
public class MultipleStartTogetherPhserDemo {
Random rd = new Random();
int bound = 5000;
public void step1Task() throws InterruptedException {
// 经过一段时间后,到达公司
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达公司!");
}
public void step2Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去公园玩。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "完成公园游玩!");
}
public void step3Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去餐厅。。。。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达餐厅!");
}
public void step4Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "开始用餐。。。。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "回家了!");
}
public static void main(String[] args) {
// 创建阶段协同器对象,重写了onAdvance方法,增加阶段到达处理逻辑
final Phaser ph = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园!人数:" + staffs);
break;
case 1:
System.out.println("大家都到公园大门,出发去餐厅!人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐!人数:" + staffs);
break;
}
// 判断是否只剩主线程一个参与者,是,则返回true,阶段协同器终止。
return registeredParties == 1;
}
};
// 增加一个任务数,用来让主线程全程参与
ph.register();
final MultipleStartTogetherPhserDemo job = new MultipleStartTogetherPhserDemo();
// 让3个全程参与的线程加入
for (int i = 0; i < 3; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "到达公园大门集合。");
ph.arriveAndAwaitAdvance();
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 让两个不参加聚餐的员工加入
for (int i = 0; i < 2; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "回家了!");
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
while (!ph.isTerminated()) {
int phaser = ph.arriveAndAwaitAdvance();
if (phaser == 2) { // 到了去餐厅的阶段,让只参加晚上聚餐的人加入
for (int i = 0; i < 4; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/u014427391/article/details/121171874
内容来源于网络,如有侵权,请联系作者删除!