Java多线程-通讯方式

x33g5p2x  于10个月前 转载在 Java  
字(18.7k)|赞(0)|评价(0)|浏览(120)

线程之间为什么要通信?

通信的目的是为了更好的协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。那么java线程是如何通信的呢,大致有以下四种方式。

Java线程的通信方式

首先,要线程间通信的模型有两种:共享内存和消息传递

方式一:使用 volatile 关键字

基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。代码如下所示:

package duo;

import java.util.ArrayList;
import java.util.List;

public class Mythread {

        // 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
        static volatile boolean notice = false;

        public static void main(String[] args) {
            List<String> list = new ArrayList<>();

            // 实现线程A
            Thread threadA = new Thread(() -> {
                for (int i = 1; i <= 10; i++) {
                    list.add("abc");
                    System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        notice = true;
                }
            });

            // 实现线程B
            Thread threadB = new Thread(() -> {
                while (true) {
                    if (notice) {
                        System.out.println("线程B收到通知,开始执行自己的业务...");
                        break;
                    }
                }
            });

            // 需要先启动线程B, B相当于是监听的动作
            threadB.start();

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 再启动线程A
            threadA.start();
        }

}

线程A向list中添加一个元素,此时list中的元素个数为:1
线程A向list中添加一个元素,此时list中的元素个数为:2
线程A向list中添加一个元素,此时list中的元素个数为:3
线程A向list中添加一个元素,此时list中的元素个数为:4
线程A向list中添加一个元素,此时list中的元素个数为:5
线程A向list中添加一个元素,此时list中的元素个数为:6
线程B收到通知,开始执行自己的业务…
线程A向list中添加一个元素,此时list中的元素个数为:7
线程A向list中添加一个元素,此时list中的元素个数为:8
线程A向list中添加一个元素,此时list中的元素个数为:9
线程A向list中添加一个元素,此时list中的元素个数为:10

方式二:使用Object类的wait() 和 notify() 方法

1、wait()、notify()/notifyAll() 方法是Object的本地final方法,无法被重写。

2、wait()使当前线程阻塞,前提是 必须先获得锁,一般配合synchronized 关键字使用,即,一般在synchronized 同步代码块里使用 wait()、notify/notifyAll() 方法。

3、 由于 wait()、notify/notifyAll() 在synchronized 代码块执行,说明当前线程一定是获取了锁的。

当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。

只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。

也就是说,notify/notifyAll() 的执行只是唤醒沉睡的线程,而不会立即释放锁

注意: 所以很多情况下你发现使用了notify进行唤醒了 但是没效果 还记得synchronized是干嘛的吗 是同步线程的作用在同一时间只允许一个线程执行 当这个线程执行完毕后才释放锁对象

所以notify只是唤醒等待池中的线程进入准备状态 但是如果正在执行的线程一直不交出锁的权限那么 处于唤醒准备状态的线程一直不会执行

所以在编程中,尽量在使用了notify/notifyAll() 后立即退出临界区,以唤醒其他线程让其获得锁

问题演示:

A 使用wait() B 使用notify()进行唤醒A 但是这前提条件是B执行完毕将锁对象交出去 其他线程才会执行 结果如下:
A使用wait() 进入等待状态

B

B

B满足条件使用notify()唤醒A

B

B 执行完毕交出锁对象

A接收到锁对象 继续从上次位置执行

可以通过结果发现 当唤醒线程不能立即执行

解决办法就是在使用notify()的时候将当前线程锁对象交出去
A使用wait() 进入等待状态

B

B

B满足条件使用notify()唤醒A 然后B使用wait() 进入等待状态 将当前锁对象释放

A接收到锁对象 继续从上次位置执行

A

A 执行完后 使用 notify() 唤醒B

B

B

AB程序结束

要注意,notify唤醒沉睡的线程后,线程会接着上次的执行继续往下执行。

private  static volatile int num=0;

    private   static Object lock=new Object();
    public static void main(String[] args) {

      Thread a= new Thread(new Runnable() {
            @Override
            public void run() {

                synchronized (lock) {
                    for (int i = 0; i < 10; i++) {
                        System.out.println("A线程");
                        num++;
                        if(num==5){
                            System.out.println("唤醒b线程");
                            lock.notify();

                            System.out.println("A线程释放锁对象 进入等待状态");
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                System.out.println("A 线程执行完毕 AB程序结束");
            }
        });

        Thread b= new Thread(new Runnable() {
            @Override
            public void run() {

                    while (Thread.interrupted()==false){   // 检测是否是中断状态 false 代表没有被中断
                        synchronized (lock) {
                            if(num==5){

                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("B");

                                System.out.println("B执行完毕 设置中断");

                                Thread.currentThread().interrupt(); //设置中断 true
                            }else{

                                System.out.println("B线程释放锁对象 进入等待状态");
                                try {
                                    lock.wait();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("B线程获取到锁对象 开始执行");
                            }
                        }
                    }

                System.out.println("B 线程执行完成 唤醒A线程");
                synchronized (lock) {
                    lock.notify();
                }

            }
        });

// 先让b线程执行 进行监控
        b.start();

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        a.start();
    }

B线程释放锁对象 进入等待状态
A线程
A线程
A线程
A线程
A线程
唤醒b线程
A线程释放锁对象 进入等待状态
B线程获取到锁对象 开始执行
B
B执行完毕 设置中断
B 线程执行完成 唤醒A线程
A线程
A线程
A线程
A线程
A线程
A 线程执行完毕 AB程序解决

实现生产者和消费者问题

1、生产者产生资源往池子里添加,前提是池子没有满,如果池子满了,则生产者暂停生产,直到自己的生成能放下池子。

2、消费者消耗池子里的资源,前提是池子的资源不为空,否则消费者暂停消耗,进入等待直到池子里有资源数满足自己的需求。

接口

public interface AbstractStorage {
    void consume(int num); //消费者
    void produce(int num);  //生产者
}

Producer (生产者)

public class Producer extends Thread{
    //每次生产的数量
    private int num ;

    //所属的仓库
    public AbstractStorage abstractStorage;

    public Producer(AbstractStorage abstractStorage){
        this.abstractStorage = abstractStorage;
    }

    public void setNum(int num){
        this.num = num;
    }

    // 线程run函数
    @Override
    public void run()
    {
        produce(num);
    }

    // 调用仓库Storage的生产函数
    public void produce(int num)
    {
        abstractStorage.produce(num);
    }
}

Consumer(消费者)

public class Consumer extends Thread{
    // 每次消费的产品数量
    private int num;

    // 所在放置的仓库
    private AbstractStorage abstractStorage1;

    // 构造函数,设置仓库
    public Consumer(AbstractStorage abstractStorage1)
    {
        this.abstractStorage1 = abstractStorage1;
    }

    // 线程run函数
    public void run()
    {
        consume(num);
    }

    // 调用仓库Storage的生产函数
    public void consume(int num)
    {
        abstractStorage1.consume(num);
    }

    public void setNum(int num){
        this.num = num;
    }
}

Storage(仓库)

import java.util.LinkedList;

public class Storage implements AbstractStorage {
    //仓库最大容量
    private final int MAX_SIZE = 100;
    //仓库存储的载体
    private LinkedList list = new LinkedList();

    //生产产品
    public void produce(int num){
        //同步
        synchronized (list){
            //仓库剩余的容量不足以存放即将要生产的数量,暂停生产
            while(list.size()+num > MAX_SIZE){
                System.out.println("【要生产的产品数量】:" + num + "\t【库存量】:"
                        + list.size() + "\t暂时不能执行生产任务!");

                try {
                    //条件不满足,生产阻塞
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            for(int i=0;i<num;i++){
                list.add(new Object());
            }

            System.out.println("【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }

    //消费产品
    public void consume(int num){
        synchronized (list){

            //不满足消费条件
            while(num > list.size()){
                System.out.println("【要消费的产品数量】:" + num + "\t【库存量】:"
                        + list.size() + "\t暂时不能执行生产任务!");

                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //消费条件满足,开始消费
            for(int i=0;i<num;i++){
                list.remove();
            }

            System.out.println("【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }
}

Test

public class Test{
    public static void main(String[] args) {
        // 仓库对象
        AbstractStorage abstractStorage = new Storage();

     // 创建7生产者对象
        Producer p1 = new Producer(abstractStorage);
        Producer p2 = new Producer(abstractStorage);
        Producer p3 = new Producer(abstractStorage);
        Producer p4 = new Producer(abstractStorage);
        Producer p5 = new Producer(abstractStorage);
        Producer p6 = new Producer(abstractStorage);
        Producer p7 = new Producer(abstractStorage);

        // 消费者对象
        Consumer c1 = new Consumer(abstractStorage);
        Consumer c2 = new Consumer(abstractStorage);
        Consumer c3 = new Consumer(abstractStorage);

        // 设置7个生产者产品生产数量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);

        // 设置消费者产品消费数量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);

        // 消费者线程开始执行 进行监听
        c1.start();
        c2.start();
        c3.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    //生产者 线程开始
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
}

【要消费的产品数量】:50 【库存量】:0 暂时不能执行生产任务!
【要消费的产品数量】:30 【库存量】:0 暂时不能执行生产任务!
【要消费的产品数量】:20 【库存量】:0 暂时不能执行生产任务!
【已经生产产品数】:10 【现仓储量为】:10
【已经生产产品数】:10 【现仓储量为】:20
【已经生产产品数】:80 【现仓储量为】:100
【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
【已经消费产品数】:20 【现仓储量为】:80
【已经消费产品数】:30 【现仓储量为】:50
【已经消费产品数】:50 【现仓储量为】:0
【已经生产产品数】:10 【现仓储量为】:10
【已经生产产品数】:10 【现仓储量为】:20
【已经生产产品数】:10 【现仓储量为】:30
【已经生产产品数】:10 【现仓储量为】:40

方式3:使用ReentrantLock结合Condition

和使用Object的wait()、notify 效果差不多

只是前者需要在 synchronized 代码块执行 而后者需要在lock加锁后才能使用

此方法在博客 java多线程-Lock锁 这篇文章里有教程 这里就不说了

方式四:使用JUC工具类

jdk1.5之后在java.util.concurrent包下提供了很多并发编程相关的工具类,简化了我们的并发编程代码的书写,

  • CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行
  • CyclicBarrier:一组线程等待至某个状态之后再全部同时执行
  • Semaphore:用于控制对某组资源的访问权限

Object和Condition休眠唤醒区别

object wait()必须在synchronized(同步锁)下使用,
*
object wait()必须要通过Nodify()方法进行唤醒
*
condition await() 必须和Lock(互斥锁/共享锁)配合使用
*
condition await() 必须通过 signal() 方法进行唤醒

CountDownLatch

CountDownLatch概念

CountDownLatch是在java1.5被引入的,存在于java.util.concurrent包下。

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。

每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch的用法

CountDownLatch典型用法:

1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

2、实现多个线程的并行,而不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

CountDownLatch的不足

CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

CountDownLatch方法使用说明

//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
public void countDown()
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
/* timeout 要等待的最长时间   unit 参数的时间单位   和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 */

public boolean await(long timeout,TimeUnit unit) throws InterruptedException

案例

例子1: 主线程等待子线程执行完成在执行

public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("子线程"+Thread.currentThread().getName()+"执行完成");
                        latch.countDown();//当前线程调用此方法,则计数减一
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }

        try {
            System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成...");
            latch.await();//阻塞当前主线程,直到CountDownLatch里计数器的值为0开始释放
            System.out.println("主线程"+Thread.currentThread().getName()+"开始执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

例子2:

百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名

public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1); //裁判
        final CountDownLatch cdAnswer = new CountDownLatch(4); //选手
        for (int i = 0; i < 4; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
                        cdOrder.await();
                        System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
                        cdAnswer.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        try {
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令");
            cdOrder.countDown();  //裁判发送口令4个选手线程 并行运行 注意不是并发
            System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待所有选手到达终点");
            cdAnswer.await();  //阻塞主线程 等待所有子线程运行完
            System.out.println("所有选手都到达终点");
            System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();//结束线程池
    }
CyclicBarrier

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。

在举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。

接下来我们看看它的构造器

//构造器1
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}
 
//构造器2
public CyclicBarrier(int parties) {
  this(parties, null);
}

CyclicBarrier有两个构造器,其中构造器1是它的核心构造器

参数1: 在这里你可以指定本局游戏的参与者数量(要拦截的线程数)

参数2: 本局结束时要执行的任务,(也就是所有线程执行完后执行的线程)

CyclicBarrier供了两种等待的方法,分别是定时等待和非定时等待。

//非定时等待 (比如指定3个线程 每一个线程调用一次内部count-- 当count==0时 释放3个线程 然后count重置为3 )
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
 
//定时等待 (就是子一定时间内如果还没有 到时间自动唤醒)
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

参数1 : timeout 时间

参数2: 时间单位 TimeUnit.SECONDS (秒)

案例1:

public static void main(String[] args) {
        // 设置线程个数为2 当个数=0时释放所有等待的线程 然后恢复线程的个数为2
        CyclicBarrier cb = new CyclicBarrier(2);
        new Thread(()->{
            System.out.println("线程1开始.."+new Date());
            try {
                cb.await(); // 2-1=1当个数不足时,等待
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程1继续向下运行..."+new Date());
        }).start();

        new Thread(()->{
            System.out.println("线程2开始.."+new Date());
            try { Thread.sleep(2000); } catch (InterruptedException e) { }

            try {
                cb.await(); // 1-1=0 线程个数够2,释放所有等待的线程
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程2继续向下运行..."+new Date());
        }).start();

    }

看懂这个后那么下面的这个案例你就能看懂了

案例2:

赛马

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyLock{

    //马类
    static class Horse implements Runnable {

        private static int counter = 0;
        private final int id = counter++;  //每次创建一个新对象的时候都会在原来id基础上+1

        private int strides = 0;   //赛马每次随机跑几步 最多0~3步
        private static Random rand = new Random();//随机数
        private static CyclicBarrier barrier;//获取

        public Horse(CyclicBarrier b) { barrier = b; }

        @Override
        public void run() {
            try {
                while(!Thread.interrupted()) { //!中断标识(false)
                    synchronized(this) {
                        //赛马每次随机跑几步
                        strides += rand.nextInt(3);
                    }
                    barrier.await();//线程进入等待状态
                }
            } catch(Exception e) {
                e.printStackTrace();
            }
        }

        //使用*来模拟马跑的痕迹
        public String tracks() {
            StringBuilder s = new StringBuilder();
            for(int i = 0; i < getStrides(); i++) {
                s.append("*");
            }
            s.append(id);
            return s.toString();
        }

        public synchronized int getStrides() { return strides; }

        public String toString() { return "Horse " + id + " "; }

    }

    //赛马
    public static class HorseRace implements Runnable {

        private static final int FINISH_LINE = 75;//赛道长度
        private static List<Horse> horses = new ArrayList<Horse>();//存储马的线程 (赛道)
        private static ExecutorService exec = Executors.newCachedThreadPool();//线程池

        @Override
        public void run() {
            StringBuilder s = new StringBuilder();
            //打印赛道边界
            for(int i = 0; i < FINISH_LINE; i++) {
                s.append("=");
            }
            System.out.println(s);
            //打印赛马轨迹
            for(Horse horse : horses) {
                System.out.println(horse.tracks());

            }

            //判断是否结束(只要有一匹马跑到终点那么将结束比赛 其他马将不用在跑了)
            for(Horse horse : horses) {
                if(horse.getStrides() >= FINISH_LINE) {
                    System.out.println(horse.toString() + "won!");
                    exec.shutdownNow();//调用线程池的结束全部线程方法不管是否还在运行 中断标识设置为true
                    return;//结束run方法
                }
            }
            //控制台刷新比赛的频率
            try {
                Thread.sleep(200);
            } catch(InterruptedException e) {
                System.out.println("barrier-action sleep interrupted");
            }
        }

        public static void main(String[] args) {
            CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
            for(int i = 0; i < 7; i++) {
                Horse horse = new Horse(barrier);
                horses.add(horse);//录入马到赛道里
                exec.execute(horse);//将线程录入线程池里
            }
        }

    }



}
Semaphore

Semaphore 是什么

Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

使用场景

用于那些资源有明确访问数量限制的场景,常用于限流 。

比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

Semaphore常用方法说明

acquire()  
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。 
(简单来说就是没有令牌就处于阻塞状态)
    
acquire(int permits)  
获取指定几个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
    
acquireUninterruptibly() 
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
    
tryAcquire()
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
    
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。

release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。 (也就是 被acquire阻塞的线程 )

hasQueuedThreads()
等待队列里是否还存在等待线程。

getQueueLength()
获取等待队列里阻塞的线程数。

drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量。

availablePermits()
返回可用的令牌数量。

在实际中无非就3个步骤 创建令牌(许可) 获取令牌(许可) 释放令牌(许可)

Semaphore实现原理

Semaphore初始化(创建令牌)
Semaphore semaphore=new Semaphore(2);

1、当调用new Semaphore(2) 方法时,默认会创建一个非公平的锁的同步阻塞队列。

2、把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。

获取令牌
semaphore.acquire();

1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1。

2、 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。

3、当计算出来的state>=0,则代表获取令牌成功。可以正常运行

释放令牌
semaphore.release();

当调用semaphore.release() 方法时

1、线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程

2、释放令牌成功之后,同时会唤醒同步队列的所有阻塞节 进行state=state-1 的操作,如果state>=0则获取令牌成功 继续执行

3、而其他的节点也会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。

用semaphore 实现停车场提示牌功能。

每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。

业务场景 :

1、停车场容纳总停车量10。

2、当一辆车进入停车场后,显示牌的剩余车位数响应的减1.

3、每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。

4、停车场剩余车位不足时,车辆只能在外面等待。

代码:

//停车场同时容纳的车辆10
    private  static  Semaphore semaphore=new Semaphore(10);//一次能停10辆

    public static void main(String[] args) {

        //模拟100辆车进入停车场
        for(int i=0;i<100;i++){

            Thread thread=new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
                        if(semaphore.availablePermits()==0){
                            System.out.println("车位不足,请耐心等待");
                        }
                        semaphore.acquire();//获取令牌尝试进入停车场
                        System.out.println(Thread.currentThread().getName()+"成功进入停车场");
                        Thread.sleep(new Random().nextInt(1000));//模拟每辆车辆在停车场停留的时间
                        System.out.println(Thread.currentThread().getName()+"驶出停车场");
                        semaphore.release();//释放令牌,腾出停车场车位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },i+"号车");

            thread.start();

        }

    }
}

大概9~10秒所有车都停过一遍了

方式五:LockSupport实现线程间的阻塞和唤醒

LockSupport 是一种非常灵活的实现线程间阻塞和唤醒的工具,使用它不用关注是等待线程先进行还是唤醒线程先运行,但是得知道线程的名字。

LockSupport是什么

刚刚开头提到过,LockSupport是一个线程工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,也可以在任意位置唤醒。

它的内部其实两类主要的方法:park(阻塞线程)和unpark(启动唤醒线程)。

注意上面的123方法,都有一个blocker,这个blocker是用来记录线程被阻塞时被谁阻塞的。用于线程监控和分析工具来定位原因的。

现在我们知道了LockSupport是用来阻塞和唤醒线程的,而且之前相信我们都知道wait/notify也是用来阻塞和唤醒线程的,那么它相比,LockSupport有什么优点呢?

2、与wait/notify对比

这里假设你已经了解了wait/notify的机制,如果不了解,上面有自己看 ,很简单。既然学到了这个LockSupport,相信你已经提前已经学了wait/notify。

我们先来举一个使用案例:

上面这段代码的意思是,我们定义一个线程,但是在内部进行了park,因此需要unpark才能唤醒继续执行,不过上面,我们在MyThread进行的park,在main线程进行的unpark。

这样来看,好像和wait/notify没有什么区别。那他的区别到底是什么呢?这个就需要仔细的观察了。这里主要有两点:

(1)wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。

(2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。

区别就是这俩

LockSupport使用
public static void main(String[] args) {
            List<String> list = new ArrayList<>();
            // 实现线程B
            Thread threadB = new Thread(() -> {
                if (list.size() != 5) {
                    LockSupport.park();
                }
                System.out.println("线程B收到通知,开始执行自己的业务...");
            });

            // 实现线程A
            Thread threadA = new Thread(() -> {
                for (int i = 1; i <= 10; i++) {
                    list.add("abc");
                    System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        LockSupport.unpark(threadB);
                }
            });

            threadA.start();
            threadB.start();
        }

的锁就可以锁住线程。

(2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。

区别就是这俩

#### LockSupport使用

```java
  public static void main(String[] args) {
            List<String> list = new ArrayList<>();
            // 实现线程B
            Thread threadB = new Thread(() -> {
                if (list.size() != 5) {
                    LockSupport.park();
                }
                System.out.println("线程B收到通知,开始执行自己的业务...");
            });

            // 实现线程A
            Thread threadA = new Thread(() -> {
                for (int i = 1; i <= 10; i++) {
                    list.add("abc");
                    System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        LockSupport.unpark(threadB);
                }
            });

            threadA.start();
            threadB.start();
        }

相关文章