线程同步工具主要包括:
可以让其他线程先执行,等执行完成之后再执行当前线程。CountDownLatch
的构造方法传入了一个整数,相当于计数器,每次调用CountDownLatch
的countDown()
方法,计数器减一,而调用await()
会阻塞当前线程,直到计数器为0后,才执行await()
之后的代码。
统计报表的优化,一般统计报表都需要从多个表中将数据统一展示出来,有时候遇上数据量比较大的情况,统计指标涉及到的业务范围也比较多,这个时候接口响应速度就会很慢,然后就可以使用CountDownLatch
,将每个查询任务拆分到单独的线程中去执行,执行完成后调用countDown()
方法,主线程调用await()
等待所有任务完成之后进行汇总,这样使得原来串行化的任务变成了并行,效率自然提高上去了。
下面的代码初始化了一个线程数为10的线程池,并且将CountDownLatch
的计数器设置为相同大小,当线程池中的任务执行完毕之后,主线程才开始往下执行:
public class CountDownLatchTest {
public static void main(String[] args) {
int count = 10;
CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
CountDownLatchRunnable runnable = new CountDownLatchRunnable((i + 1) * 1000,countDownLatch);
fixedThreadPool.execute(runnable);
}
try {
countDownLatch.await();
System.out.println(\"所有runnable执行完毕,开始向下执行\");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
fixedThreadPool.shutdown();
}
}
public static class CountDownLatchRunnable implements Runnable {
private long sleepTime;
private CountDownLatch latch;
public CountDownLatchRunnable(long sleepTime, CountDownLatch countDownLatch) {
this.sleepTime = sleepTime;
this.latch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+\"线程开始执行\");
TimeUnit.MILLISECONDS.sleep(sleepTime);
latch.countDown();
System.out.println(Thread.currentThread().getName()+\"线程执行完毕\");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果:
pool-1-thread-1线程开始执行
pool-1-thread-2线程开始执行
pool-1-thread-3线程开始执行
pool-1-thread-8线程开始执行
pool-1-thread-4线程开始执行
pool-1-thread-9线程开始执行
pool-1-thread-5线程开始执行
pool-1-thread-10线程开始执行
pool-1-thread-6线程开始执行
pool-1-thread-7线程开始执行
pool-1-thread-1线程执行完毕
pool-1-thread-2线程执行完毕
pool-1-thread-3线程执行完毕
pool-1-thread-4线程执行完毕
pool-1-thread-5线程执行完毕
pool-1-thread-6线程执行完毕
pool-1-thread-7线程执行完毕
pool-1-thread-8线程执行完毕
pool-1-thread-9线程执行完毕
pool-1-thread-10线程执行完毕
所有runnable执行完毕,开始向下执行
等待其他线程同时到达(会阻塞当前线程),所有线程同时到达后,才继续往下执行。CyclicBarrier
有两个构造方法:
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
第一个参数是线程的数量,也就是说当有这么多个线程到达之后,所有线程才继续往下执行。第二个参数是一个Runnable
,也就是说,所有线程到达之后,会执行这个Runnable
,你可以在它的run方法里做一些处理。
不难看出,CyclicBarrier
可以实现和CountDownLatch
一样的效果,不过CyclicBarrier
有一个reset
方法,可以重复利用,而CountDownLatch
只能用一次。而且CyclicBarrier
可以在做了一些操作之后各个线程继续往下执行。
比如说,你和几个朋友约好了,明天早上集合到你家,开车出去玩,第二天早上你得等你的朋友们都到齐上车了,才能开车出发。出发之前还可以一起吃个早饭,而CountDownLatch
是做不到的。
下面的代码初始化了一个线程数为10的线程池,将CyclicBarrier
的线程数大小也设置为10,这样线程到达cyclicBarrier.await()
的时候都会被阻塞,直到最后一个线程到达:
public class CyclicBarrierTest {
public static void main(String[] args) {
int count = 10;
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);
CyclicBarrier cyclicBarrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
System.out.println(\"所有线程已经到达,可以在这里面做一些东西\");
}
});
for (int i = 0; i < count; i++) {
CyclicBarrierRunnable runnable = new CyclicBarrierRunnable((i + 1) * 1000,cyclicBarrier);
fixedThreadPool.execute(runnable);
}
}
public static class CyclicBarrierRunnable implements Runnable{
private CyclicBarrier cyclicBarrier;
private long sleepTime;
public CyclicBarrierRunnable(long sleepTime, CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + \" 线程开始执行任务\");
TimeUnit.MILLISECONDS.sleep(sleepTime);
System.out.println(Thread.currentThread().getName() + \" 到达珊阑处,等待其他线程到达\");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + \" 所有线程到达,开始向下执行\");
} catch (BrokenBarrierException e) {
e.printStackTrace();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
执行结果:
pool-1-thread-2 线程开始执行任务
pool-1-thread-3 线程开始执行任务
pool-1-thread-4 线程开始执行任务
pool-1-thread-1 线程开始执行任务
pool-1-thread-6 线程开始执行任务
pool-1-thread-5 线程开始执行任务
pool-1-thread-8 线程开始执行任务
pool-1-thread-9 线程开始执行任务
pool-1-thread-7 线程开始执行任务
pool-1-thread-10 线程开始执行任务
pool-1-thread-1 到达珊阑处,等待其他线程到达
pool-1-thread-2 到达珊阑处,等待其他线程到达
pool-1-thread-3 到达珊阑处,等待其他线程到达
pool-1-thread-4 到达珊阑处,等待其他线程到达
pool-1-thread-5 到达珊阑处,等待其他线程到达
pool-1-thread-6 到达珊阑处,等待其他线程到达
pool-1-thread-7 到达珊阑处,等待其他线程到达
pool-1-thread-8 到达珊阑处,等待其他线程到达
pool-1-thread-9 到达珊阑处,等待其他线程到达
pool-1-thread-10 到达珊阑处,等待其他线程到达
所有线程已经到达,可以在这里面做一些东西
pool-1-thread-10 所有线程到达,开始向下执行
pool-1-thread-1 所有线程到达,开始向下执行
pool-1-thread-2 所有线程到达,开始向下执行
pool-1-thread-3 所有线程到达,开始向下执行
pool-1-thread-4 所有线程到达,开始向下执行
pool-1-thread-5 所有线程到达,开始向下执行
pool-1-thread-6 所有线程到达,开始向下执行
pool-1-thread-7 所有线程到达,开始向下执行
pool-1-thread-9 所有线程到达,开始向下执行
pool-1-thread-8 所有线程到达,开始向下执行
通过构造方法可以指定可用资源数量,线程在执行的时候,通过semaphore.acquire()
获取资源,如果获取到,继续执行,如果获取不到,就会阻塞当前线程,直到其他线程通过semaphore.release()
释放资源。
池化技术 比如对象池 连接池 缓存池等。多个线程获取有限资源的情况
下面的代码,十个线程抢占三个可用资源,剩下的7个线程会被阻塞住,直到资源使用完毕:
public class SemphoreTest {
public static void main(String[] args) {
int count = 10;
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < count; i++) {
SemphoreRunnable runnable = new SemphoreRunnable((i + 1) * 1000,semaphore);
fixedThreadPool.execute(runnable);
}
}
public static class SemphoreRunnable implements Runnable{
private Semaphore semaphore;
private long sleepTime;
public SemphoreRunnable(long sleepTime, Semaphore semaphore) {
this.semaphore = semaphore;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + \" 占用一个资源\");
TimeUnit.MILLISECONDS.sleep(sleepTime);
System.out.println(Thread.currentThread().getName() + \"资源使用完毕,释放资源\");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果:
pool-1-thread-2 占用一个资源
pool-1-thread-3 占用一个资源
pool-1-thread-6 占用一个资源
pool-1-thread-2资源使用完毕,释放资源
pool-1-thread-4 占用一个资源
pool-1-thread-3资源使用完毕,释放资源
pool-1-thread-5 占用一个资源
pool-1-thread-6资源使用完毕,释放资源
pool-1-thread-4资源使用完毕,释放资源
pool-1-thread-1 占用一个资源
pool-1-thread-7 占用一个资源
pool-1-thread-1资源使用完毕,释放资源
pool-1-thread-8 占用一个资源
阻塞队列提供可阻塞的入队和出队操作,队列满了,入队操作就会阻塞直到有其他元素出队,队列空了,出队操作会阻塞直到有新的元素可用。
BlockingQueue
提供了一下几种操作:
boolean add(E e); //增加一个元素 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
boolean offer(E e); //添加一个元素,成功返回true,失败返回false,不会阻塞
void put(E e); //添加一个元素,如果队列已满,则阻塞
E take(); //移除并返回队列头部的元素,如果队列为空,则阻塞
E poll(); //移除并返回队列头部的元素,如果队列为空,则直接返回null,不会阻塞
boolean remove(Object o); //移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
其中,实现了BlockingQueue
的接口有下面几个队列:
下面使用ArrayBlockingQueue
实现生产者-消费者模型,生产者每隔1秒生产一个产品,10个消费者消费产品,每隔1秒,只有一个消费者获取产品消费,其他消费者只能等待:
public class BlockQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
Executors.newSingleThreadExecutor().submit(new Producer(blockingQueue));
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(new Consumer(blockingQueue));
}
}
public static class Consumer implements Runnable{
private BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(\"准备消费产品\");
System.out.println(Thread.currentThread().getName() + \" 开始消费产品 \" + blockingQueue.take());
System.out.println(Thread.currentThread().getName() + \"消费结束\");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class Producer implements Runnable{
private BlockingQueue<String> blockingQueue;
public Producer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(1000);
SimpleDateFormat dateFormat = new SimpleDateFormat(\"HH:mm:ss\");
String product = dateFormat.format(new Date());
System.out.println(Thread.currentThread().getName() + \" 生产产品\");
blockingQueue.put(product);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
}
执行结果:
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-3 开始消费产品 00:10:40
pool-2-thread-3消费结束
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-6 开始消费产品 00:10:41
pool-2-thread-6消费结束
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-1 开始消费产品 00:10:42
pool-2-thread-1消费结束
用于执行一个可返回结果的长任务,任务在单独的线程中执行,其他线程可以用 get 方法取任务结果,如果任务尚未完成,线程会阻塞。
public class FutureTaskTest {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask((Callable<String>) () -> {
TimeUnit.MILLISECONDS.sleep(1000);
Random random = new Random();
boolean b = random.nextBoolean();
return b?\"success\":\"fail\";
});
//开一个子线程运行任务
new Thread(futureTask).start();
try {
System.out.println(\"主线程执行其他业务。。。\");
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(\"开始获取子线程运行结果。。。\");
String result = futureTask.get();
if (\"success\".equals(result)){
System.out.println(\"任务执行成功\");
}else {
System.out.println(\"任务执行失败\");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
执行结果:
主线程执行其他业务。。。
开始获取子线程运行结果。。。
任务执行失败
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.lingxiaomz.top/articleContent/?id=104726259129858198
内容来源于网络,如有侵权,请联系作者删除!