深入理解多线程之线程同步工具

x33g5p2x  于2021-06-23 转载在 Java  
字(9.2k)|赞(0)|评价(0)|浏览(470)

线程同步工具主要包括:

  • 闭锁(CountDownLatch)
  • 栅栏(CyclicBarrier)
  • 信号量(Semaphore)
  • 阻塞队列(BlockingQueue)
  • FutureTask

CountDownLatch

可以让其他线程先执行,等执行完成之后再执行当前线程。CountDownLatch的构造方法传入了一个整数,相当于计数器,每次调用CountDownLatchcountDown()方法,计数器减一,而调用await()会阻塞当前线程,直到计数器为0后,才执行await()之后的代码。

使用场景

统计报表的优化,一般统计报表都需要从多个表中将数据统一展示出来,有时候遇上数据量比较大的情况,统计指标涉及到的业务范围也比较多,这个时候接口响应速度就会很慢,然后就可以使用CountDownLatch,将每个查询任务拆分到单独的线程中去执行,执行完成后调用countDown()方法,主线程调用await()等待所有任务完成之后进行汇总,这样使得原来串行化的任务变成了并行,效率自然提高上去了。

下面的代码初始化了一个线程数为10的线程池,并且将CountDownLatch的计数器设置为相同大小,当线程池中的任务执行完毕之后,主线程才开始往下执行:

public class CountDownLatchTest {
    public static void main(String[] args) {
        int count = 10;
        CountDownLatch countDownLatch = new CountDownLatch(count);
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);

        for (int i = 0; i < count; i++) {
            CountDownLatchRunnable runnable = new CountDownLatchRunnable((i + 1) * 1000,countDownLatch);
            fixedThreadPool.execute(runnable);
        }
        try {
            countDownLatch.await();
            System.out.println(\"所有runnable执行完毕,开始向下执行\");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
         fixedThreadPool.shutdown();
        }
    }

    public static class CountDownLatchRunnable implements Runnable {
        private long sleepTime;
        private CountDownLatch latch;
        public CountDownLatchRunnable(long sleepTime, CountDownLatch countDownLatch) {
            this.sleepTime = sleepTime;
            this.latch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+\"线程开始执行\");
                TimeUnit.MILLISECONDS.sleep(sleepTime);
                latch.countDown();
                System.out.println(Thread.currentThread().getName()+\"线程执行完毕\");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

pool-1-thread-1线程开始执行
pool-1-thread-2线程开始执行
pool-1-thread-3线程开始执行
pool-1-thread-8线程开始执行
pool-1-thread-4线程开始执行
pool-1-thread-9线程开始执行
pool-1-thread-5线程开始执行
pool-1-thread-10线程开始执行
pool-1-thread-6线程开始执行
pool-1-thread-7线程开始执行
pool-1-thread-1线程执行完毕
pool-1-thread-2线程执行完毕
pool-1-thread-3线程执行完毕
pool-1-thread-4线程执行完毕
pool-1-thread-5线程执行完毕
pool-1-thread-6线程执行完毕
pool-1-thread-7线程执行完毕
pool-1-thread-8线程执行完毕
pool-1-thread-9线程执行完毕
pool-1-thread-10线程执行完毕
所有runnable执行完毕,开始向下执行

CyclicBarrier

等待其他线程同时到达(会阻塞当前线程),所有线程同时到达后,才继续往下执行。CyclicBarrier有两个构造方法:

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

第一个参数是线程的数量,也就是说当有这么多个线程到达之后,所有线程才继续往下执行。第二个参数是一个Runnable,也就是说,所有线程到达之后,会执行这个Runnable,你可以在它的run方法里做一些处理。

使用场景

不难看出,CyclicBarrier可以实现和CountDownLatch一样的效果,不过CyclicBarrier有一个reset方法,可以重复利用,而CountDownLatch只能用一次。而且CyclicBarrier可以在做了一些操作之后各个线程继续往下执行。

比如说,你和几个朋友约好了,明天早上集合到你家,开车出去玩,第二天早上你得等你的朋友们都到齐上车了,才能开车出发。出发之前还可以一起吃个早饭,而CountDownLatch是做不到的。

下面的代码初始化了一个线程数为10的线程池,将CyclicBarrier的线程数大小也设置为10,这样线程到达cyclicBarrier.await()的时候都会被阻塞,直到最后一个线程到达:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        int count = 10;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(count, new Runnable() {
            @Override
            public void run() {
                System.out.println(\"所有线程已经到达,可以在这里面做一些东西\");
            }
        });
        for (int i = 0; i < count; i++) {
            CyclicBarrierRunnable runnable = new CyclicBarrierRunnable((i + 1) * 1000,cyclicBarrier);
            fixedThreadPool.execute(runnable);
        }
    }

    public static class CyclicBarrierRunnable implements Runnable{
        private CyclicBarrier cyclicBarrier;
        private long sleepTime;

        public CyclicBarrierRunnable(long sleepTime, CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
            this.sleepTime = sleepTime;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + \" 线程开始执行任务\");
                TimeUnit.MILLISECONDS.sleep(sleepTime);
                System.out.println(Thread.currentThread().getName() + \" 到达珊阑处,等待其他线程到达\");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + \" 所有线程到达,开始向下执行\");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

执行结果:

pool-1-thread-2 线程开始执行任务
pool-1-thread-3 线程开始执行任务
pool-1-thread-4 线程开始执行任务
pool-1-thread-1 线程开始执行任务
pool-1-thread-6 线程开始执行任务
pool-1-thread-5 线程开始执行任务
pool-1-thread-8 线程开始执行任务
pool-1-thread-9 线程开始执行任务
pool-1-thread-7 线程开始执行任务
pool-1-thread-10 线程开始执行任务
pool-1-thread-1 到达珊阑处,等待其他线程到达
pool-1-thread-2 到达珊阑处,等待其他线程到达
pool-1-thread-3 到达珊阑处,等待其他线程到达
pool-1-thread-4 到达珊阑处,等待其他线程到达
pool-1-thread-5 到达珊阑处,等待其他线程到达
pool-1-thread-6 到达珊阑处,等待其他线程到达
pool-1-thread-7 到达珊阑处,等待其他线程到达
pool-1-thread-8 到达珊阑处,等待其他线程到达
pool-1-thread-9 到达珊阑处,等待其他线程到达
pool-1-thread-10 到达珊阑处,等待其他线程到达
所有线程已经到达,可以在这里面做一些东西
pool-1-thread-10 所有线程到达,开始向下执行
pool-1-thread-1 所有线程到达,开始向下执行
pool-1-thread-2 所有线程到达,开始向下执行
pool-1-thread-3 所有线程到达,开始向下执行
pool-1-thread-4 所有线程到达,开始向下执行
pool-1-thread-5 所有线程到达,开始向下执行
pool-1-thread-6 所有线程到达,开始向下执行
pool-1-thread-7 所有线程到达,开始向下执行
pool-1-thread-9 所有线程到达,开始向下执行
pool-1-thread-8 所有线程到达,开始向下执行

Semaphore

通过构造方法可以指定可用资源数量,线程在执行的时候,通过semaphore.acquire()获取资源,如果获取到,继续执行,如果获取不到,就会阻塞当前线程,直到其他线程通过semaphore.release()释放资源。

使用场景

池化技术 比如对象池 连接池 缓存池等。多个线程获取有限资源的情况

下面的代码,十个线程抢占三个可用资源,剩下的7个线程会被阻塞住,直到资源使用完毕:

public class SemphoreTest {

    public static void main(String[] args) {
        int count = 10;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(count);
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < count; i++) {
            SemphoreRunnable runnable = new SemphoreRunnable((i + 1) * 1000,semaphore);
            fixedThreadPool.execute(runnable);
        }
    }

    public static class SemphoreRunnable implements Runnable{
        private Semaphore semaphore;
        private long sleepTime;

        public SemphoreRunnable(long sleepTime, Semaphore semaphore) {
            this.semaphore = semaphore;
            this.sleepTime = sleepTime;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + \" 占用一个资源\");
                TimeUnit.MILLISECONDS.sleep(sleepTime);
                System.out.println(Thread.currentThread().getName() + \"资源使用完毕,释放资源\");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

pool-1-thread-2 占用一个资源
pool-1-thread-3 占用一个资源
pool-1-thread-6 占用一个资源
pool-1-thread-2资源使用完毕,释放资源
pool-1-thread-4 占用一个资源
pool-1-thread-3资源使用完毕,释放资源
pool-1-thread-5 占用一个资源
pool-1-thread-6资源使用完毕,释放资源
pool-1-thread-4资源使用完毕,释放资源
pool-1-thread-1 占用一个资源
pool-1-thread-7 占用一个资源
pool-1-thread-1资源使用完毕,释放资源
pool-1-thread-8 占用一个资源

BlockingQueue

阻塞队列提供可阻塞的入队和出队操作,队列满了,入队操作就会阻塞直到有其他元素出队,队列空了,出队操作会阻塞直到有新的元素可用。

BlockingQueue提供了一下几种操作:

boolean add(E e); //增加一个元素 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
boolean offer(E e); //添加一个元素,成功返回true,失败返回false,不会阻塞
void put(E e);  //添加一个元素,如果队列已满,则阻塞
E take();  //移除并返回队列头部的元素,如果队列为空,则阻塞
E poll();  //移除并返回队列头部的元素,如果队列为空,则直接返回null,不会阻塞
boolean remove(Object o); //移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常

其中,实现了BlockingQueue的接口有下面几个队列:

  • ArrayBlockingQueue:数组实现的有界队列,初始化必须制定容量,并且可以选择是否需要公平性(通过设置ReentrantLock来实现公平锁)
  • LinkedBlockingQueue :链表实现的有界队列,在不指定容量的时候,大小为Integer.MAX_VALUE。
  • PriorityBlockingQueue :支持优先级的无界队列,元素按照优先级顺序移除,因为是无界的,所以put不会被阻塞
  • DelayQueue :优先级支持的、基于时间调度的队列,元素只有在延迟期满了,才能从中获取到
  • SynchronousQueue:简单聚集队列

下面使用ArrayBlockingQueue实现生产者-消费者模型,生产者每隔1秒生产一个产品,10个消费者消费产品,每隔1秒,只有一个消费者获取产品消费,其他消费者只能等待:

public class BlockQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
        Executors.newSingleThreadExecutor().submit(new Producer(blockingQueue));
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            executorService.submit(new Consumer(blockingQueue));
        }
    }

    public static class Consumer implements Runnable{
        private BlockingQueue<String> blockingQueue;
        public Consumer(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            try {
                while (true){
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println(\"准备消费产品\");
                    System.out.println(Thread.currentThread().getName() + \" 开始消费产品 \" + blockingQueue.take());
                    System.out.println(Thread.currentThread().getName() + \"消费结束\");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class Producer implements Runnable{
        private BlockingQueue<String> blockingQueue;
        public Producer(BlockingQueue<String> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            try {
                while (true){
                    TimeUnit.MILLISECONDS.sleep(1000);
                    SimpleDateFormat dateFormat = new SimpleDateFormat(\"HH:mm:ss\");
                    String product = dateFormat.format(new Date());
                    System.out.println(Thread.currentThread().getName() + \" 生产产品\");
                    blockingQueue.put(product);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
}

执行结果:

准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-3 开始消费产品 00:10:40
pool-2-thread-3消费结束
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-6 开始消费产品 00:10:41
pool-2-thread-6消费结束
准备消费产品
pool-1-thread-1 生产产品
pool-2-thread-1 开始消费产品 00:10:42
pool-2-thread-1消费结束

FutureTask

用于执行一个可返回结果的长任务,任务在单独的线程中执行,其他线程可以用 get 方法取任务结果,如果任务尚未完成,线程会阻塞。

public class FutureTaskTest {
    public static void main(String[] args) {
        FutureTask<String> futureTask = new FutureTask((Callable<String>) () -> {
            TimeUnit.MILLISECONDS.sleep(1000);
            Random random = new Random();
            boolean b = random.nextBoolean();
            return b?\"success\":\"fail\";
        });
        //开一个子线程运行任务
        new Thread(futureTask).start();
        try {
            System.out.println(\"主线程执行其他业务。。。\");
            TimeUnit.MILLISECONDS.sleep(1000);
            System.out.println(\"开始获取子线程运行结果。。。\");
            String result = futureTask.get();
            if (\"success\".equals(result)){
                System.out.println(\"任务执行成功\");
            }else {
                System.out.println(\"任务执行失败\");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

主线程执行其他业务。。。
开始获取子线程运行结果。。。
任务执行失败

相关文章

微信公众号

最新文章

更多