带超时功能的 Latch 设计模式

x33g5p2x  于2022-04-27 转载在 其他  
字(3.8k)|赞(0)|评价(0)|浏览(308)

一 点睛

对 Latch 设计模式增加超时功能

二 实战

1 Latch

package concurrent.latch;

import java.util.concurrent.TimeUnit;

public abstract class Latch {
    // 用于控制多少个线程完成任务时才能打开阀门
    protected int limit;

    // 通过构造函数传入 limit
    public Latch(int limit) {
        this.limit = limit;
    }

    // 该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是允许被中断的
    public abstract void await() throws InterruptedException;

    // 当任务线程完成工作之后调用该方法使得计数器减一
    public abstract void countDown();

    // 获取当前还有多少个线程没有完成任务
    public abstract int getUnarriveed();

    // 超时等待
    public abstract void await(TimeUnit unit, long time) throws InterruptedException, WaitTimeoutException;
}

2 CountDownLatch

package concurrent.latch;

import java.util.concurrent.TimeUnit;

/**
* @className: CountDownLatch
* @description: 无限等待门栓实现
* @date: 2022/4/25
* @author: cakin
*/
public class CountDownLatch extends Latch {
    public CountDownLatch(int limit) {
        super(limit);
    }

    @Override
    public void await() throws InterruptedException {
        synchronized (this) {
            // 当 limit > 0 时,当前线程进入阻塞状态
            while (limit > 0) {
                this.wait();
            }
        }
    }

    @Override
    public void countDown() {
        synchronized (this) {
            if (limit <= 0) {
                throw new IllegalStateException("all of task already arrived");
            }
            // 使 limit 减一,并且通知阻塞线程
            limit--;
            this.notifyAll();
        }
    }

    @Override
    public int getUnarriveed() {
        // 返回有多少线程还未完成任务
        return limit;
    }

    @Override
    public void await(TimeUnit unit, long time) throws InterruptedException, WaitTimeoutException {
        if (time <= 0) {
            throw new IllegalArgumentException("The time is invalid");
        }
        long remainingNanos = unit.toNanos(time); // 将秒转换为纳秒
        // 等待任务将在 endNanos 纳秒后超时
        final long endNanos = System.nanoTime() + remainingNanos;
        synchronized (this) {
            while (limit > 0) {
                if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0) {
                    throw new WaitTimeoutException("The wait time over specify time。");
                }
                // 等待 remainingNanos,在等待的过程中可能会被中断,需要重新计算 remainingNanos
                this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
                remainingNanos = endNanos - System.nanoTime();
            }
        }
    }
}

3 ProgrammerTravel

package concurrent.latch;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
* @className: ProgrammerTravel
* @description: 程序员旅游线程
* @date: 2022/4/25
* @author: cakin
*/
public class ProgrammerTravel extends Thread {
    // 门栓
    private final Latch latch;
    // 程序员
    private final String programmer;
    // 交通工具
    private final String transportation;

    public ProgrammerTravel(Latch latch, String programmer, String transportation) {
        this.latch = latch;
        this.programmer = programmer;
        this.transportation = transportation;
    }

    @Override
    public void run() {
        System.out.println(programmer + " start take the transportation [" + transportation + "]");
        try {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(programmer + " arrived by" + transportation);
        // 完成任务时使计数器减一
        latch.countDown();
    }
}

4 WaitTimeoutException

package concurrent.latch;

public class WaitTimeoutException extends Exception {
    public WaitTimeoutException(String message) {
        super(message);
    }
}

5 Test1

package concurrent.latch;

import java.util.concurrent.TimeUnit;

public class Test1 {
    public static void main(String[] args) {
        Latch latch = new CountDownLatch(4);
        new ProgrammerTravel(latch, "Alex", "Bus").start();
        new ProgrammerTravel(latch, "Gavin", "Walking").start();
        new ProgrammerTravel(latch, "Jack", "Subway").start();
        new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
        // 当前线程(main 线程会进入阻塞,直到四个程序员全部都达到目的地)
        try {
            latch.await(TimeUnit.SECONDS,5);
            System.out.println("== all of programmer arrvied ==");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (WaitTimeoutException e) {
            e.printStackTrace();
        }
    }
}

三 测试结果

Gavin start take the transportation [Walking]

Alex start take the transportation [Bus]

Jack start take the transportation [Subway]

Dillon start take the transportation [Bicycle]

Jack arrived bySubway

Gavin arrived byWalking

Alex arrived byBus

concurrent.latch.WaitTimeoutException: The wait time over specify time。

at concurrent.latch.CountDownLatch.await(CountDownLatch.java:56)

at concurrent.latch.Test1.main(Test1.java:14)

Dillon arrived byBicycle

相关文章