java.util.concurrent.CyclicBarrier.await()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(119)

本文整理了Java中java.util.concurrent.CyclicBarrier.await()方法的一些代码示例,展示了CyclicBarrier.await()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。CyclicBarrier.await()方法的具体详情如下:
包路径:java.util.concurrent.CyclicBarrier
类名称:CyclicBarrier
方法名:await

CyclicBarrier.await介绍

[英]Waits until all #getParties have invoked await on this barrier.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  • The last thread arrives; or
  • Some other thread Thread#interruptthe current thread; or
  • Some other thread Thread#interruptone of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes #reset on this barrier.

If the current thread:

  • has its interrupted status set on entry to this method; or
  • is Thread#interrupt while waiting
    then InterruptedException is thrown and the current thread's interrupted status is cleared.

If the barrier is #reset while any thread is waiting, or if the barrier #isBroken when await is invoked, or while any thread is waiting, then BrokenBarrierException is thrown.

If any thread is Thread#interrupt while waiting, then all other waiting threads will throw BrokenBarrierException and the barrier is placed in the broken state.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue. If an exception occurs during the barrier action then that exception will be propagated in the current thread and the barrier is placed in the broken state.
[中]等待,直到所有#getParties在此屏障上调用Wait。
如果当前线程不是最后一个到达的线程,则出于线程调度目的,它将被禁用,并处于休眠状态,直到发生以下情况之一:
*最后一个线程到达;或
*其他线程#中断当前线程;或
*其他线程#中断其他等待线程中的一个;或
*其他线程在等待屏障时超时;或
*其他一些线程调用此屏障上的#reset。
如果当前线程:
*在进入该方法时设置其中断状态;或
*线程是否在等待时中断
然后抛出InterruptedException,并清除当前线程的中断状态。
如果屏障在任何线程等待时被重置,或者如果屏障在调用wait时被破坏,或者在任何线程等待时被破坏,则会引发BrokenBarrierException。
如果任何线程在等待时被线程中断,那么所有其他等待线程将抛出BrokenBarrierException,屏障将处于断开状态。
如果当前线程是最后一个到达的线程,并且构造函数中提供了非空的屏障操作,那么当前线程将在允许其他线程继续之前运行该操作。如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障将处于断开状态。

代码示例

代码示例来源:origin: stackoverflow.com

private CyclicBarrier barrier = new CyclicBarrier(2);

void main() throws InterruptedException{
 asyncDoSomething();
 //wait until other party calls barrier.await()
 barrier.await();
}

void onFinishDoSomething() throws InterruptedException{
 //do something ...
 //then signal the end of work
 barrier.await();
}

代码示例来源:origin: stackoverflow.com

// We want to start just 2 threads at the same time, but let's control that 
// timing from the main thread. That's why we have 3 "parties" instead of 2.
final CyclicBarrier gate = new CyclicBarrier(3);

Thread t1 = new Thread(){
  public void run(){
    gate.await();
    //do stuff    
  }};
Thread t2 = new Thread(){
  public void run(){
    gate.await();
    //do stuff    
  }};

t1.start();
t2.start();

// At this point, t1 and t2 are blocking on the gate. 
// Since we gave "3" as the argument, gate is not opened yet.
// Now if we block on the gate from the main thread, it will open
// and all threads will start to do stuff!

gate.await();
System.out.println("all threads started");

代码示例来源:origin: LeonardoZ/java-concurrency-patterns

public static void main(String[] args) {
  ExecutorService executor = Executors.newCachedThreadPool();
  Runnable barrierAction = () -> System.out.println("Well done, guys!");
  CyclicBarrier barrier = new CyclicBarrier(10, barrierAction);
  Runnable task = () -> {
    try {
      // simulating a task that can take at most 1sec to run
      System.out.println("Doing task for " + Thread.currentThread().getName());
      Thread.sleep(new Random().nextInt(10) * 100);
      System.out.println("Done for " + Thread.currentThread().getName());
      barrier.await();
    } catch (InterruptedException | BrokenBarrierException e) {
      e.printStackTrace();
    }
  };
  for (int i = 0; i < 10; i++) {
    executor.execute(task);
  }
  executor.shutdown();
}

代码示例来源:origin: killme2008/Metamorphosis

public void start() {
  this.barrier = new CyclicBarrier(this.threadCount + 1, this.watch);
  for (int i = 0; i < this.threadCount; i++) {
    new Thread(new ConcurrentTestRunner(this.barrier, this.task, this.repeatCount, i)).start();
  }
  try {
    this.watch.start();
    this.barrier.await();
    this.barrier.await();
  }
  catch (Exception e) {
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: wildfly/wildfly

protected void sendMessages() {
  final AtomicInteger num_msgs_sent=new AtomicInteger(0); // all threads will increment this
  final AtomicInteger actually_sent=new AtomicInteger(0); // incremented *after* sending a message
  final AtomicLong    seqno=new AtomicLong(1); // monotonically increasing seqno, to be used by all threads
  final Sender[]      senders=new Sender[num_threads];
  final CyclicBarrier barrier=new CyclicBarrier(num_threads +1);
  final byte[]        payload=new byte[msg_size];
  for(int i=0; i < num_threads; i++) {
    senders[i]=new Sender(barrier, num_msgs_sent, actually_sent, seqno, payload);
    senders[i].setName("sender-" + i);
    senders[i].start();
  }
  try {
    System.out.printf("-- sending %,d %s\n", num_msgs, (oob? " OOB msgs" : " msgs"));
    barrier.await();
  }
  catch(Exception e) {
    System.err.println("failed triggering send threads: " + e);
  }
}

代码示例来源:origin: wildfly/wildfly

protected void sendMessages() {
  final AtomicInteger num_msgs_sent=new AtomicInteger(0); // all threads will increment this
  final AtomicLong    seqno=new AtomicLong(1); // monotonically increasing seqno, to be used by all threads
  final Sender[]      senders=new Sender[num_threads];
  final CyclicBarrier barrier=new CyclicBarrier(num_threads +1);
  final byte[]        payload=new byte[msg_size];
  for(int i=0; i < num_threads; i++) {
    senders[i]=new Sender(barrier, num_msgs_sent, seqno, payload);
    senders[i].setName("invoker-" + i);
    senders[i].start();
  }
  try {
    System.out.println("-- invoking " + num_msgs + " msgs");
    barrier.await();
  }
  catch(Exception e) {
    System.err.println("failed triggering send threads: " + e);
  }
}

代码示例来源:origin: google/guava

public void testCustomSchedule_startStop() throws Exception {
 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
 final AtomicBoolean shouldWait = new AtomicBoolean(true);
 Runnable task =
   new Runnable() {
    @Override
    public void run() {
     try {
      if (shouldWait.get()) {
       firstBarrier.await();
       secondBarrier.await();
      }
     } catch (Exception e) {
      throw new RuntimeException(e);
     }
    }
   };
 TestCustomScheduler scheduler = new TestCustomScheduler();
 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
 firstBarrier.await();
 assertEquals(1, scheduler.scheduleCounter.get());
 secondBarrier.await();
 firstBarrier.await();
 assertEquals(2, scheduler.scheduleCounter.get());
 shouldWait.set(false);
 secondBarrier.await();
 future.cancel(false);
}

代码示例来源:origin: wildfly/wildfly

protected void sendMessages() {
  final AtomicInteger num_msgs_sent=new AtomicInteger(0); // all threads will increment this
  final Sender[]      senders=new Sender[num_threads];
  final CyclicBarrier barrier=new CyclicBarrier(num_threads +1);
  final byte[]        payload=new byte[msg_size];
  reset();
  for(int i=0; i < num_threads; i++) {
    senders[i]=new Sender(barrier, num_msgs_sent, payload);
    senders[i].setName("sender-" + i);
    senders[i].start();
  }
  try {
    System.out.println("-- sending " + num_msgs + " msgs");
    barrier.await();
  }
  catch(Exception e) {
    System.err.println("failed triggering send threads: " + e);
  }
}

代码示例来源:origin: LMAX-Exchange/disruptor

private Future<List<StubEvent>> getMessages(final long initial, final long toWaitFor) throws InterruptedException,
  BrokenBarrierException
{
  final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
  final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  final Future<List<StubEvent>> f = executor.submit(
    new TestWaiter(
      cyclicBarrier, sequenceBarrier, ringBuffer,
      initial, toWaitFor));
  cyclicBarrier.await();
  return f;
}

代码示例来源:origin: google/guava

public void testTaskThrowsError() throws Exception {
 class MyError extends Error {}
 final CyclicBarrier barrier = new CyclicBarrier(2);
  barrier.await(1, TimeUnit.SECONDS);
  executor.execute(barrierTask);
  barrier.await(1, TimeUnit.SECONDS);
 } finally {
  service.shutdown();

代码示例来源:origin: google/guava

final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
Runnable wrapper =
  new Runnable() {
 executor.execute(wrapper);
barrier.await(); // release the threads!
barrier.await(); // wait for them all to complete
assertEquals(1, task.get().intValue());
assertEquals(1, counter.get());

代码示例来源:origin: Netflix/servo

@Test
 public void testConcurrentTagList() throws Exception {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch(count);
  final Set<BasicTagList> tagLists = Collections
    .newSetFromMap(new ConcurrentHashMap<>());

  final CyclicBarrier barrier = new CyclicBarrier(count);

  for (int i = 0; i < count; i++) {
   new Thread(() -> {
    try {
     barrier.await();
     tagLists.add(BasicTagList.of("id", "1", "color",
       "green"));
    } catch (Exception e) {
     e.printStackTrace(System.out);
    } finally {
     latch.countDown();
    }
   }).start();
  }
  latch.await();
  assertEquals(tagLists.size(), 1);
 }
}

代码示例来源:origin: google/guava

public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
 final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
 // This will flakily deadlock, so run it multiple times to increase the flake likelihood
 for (int i = 0; i < 1000; i++) {
  Service service =
    new AbstractScheduledService() {
     @Override
     protected void runOneIteration() {}
     @Override
     protected Scheduler scheduler() {
      return new CustomScheduler() {
       @Override
       protected Schedule getNextSchedule() throws Exception {
        if (state() != State.STARTING) {
         inGetNextSchedule.await();
         Thread.yield();
         throw new RuntimeException("boom");
        }
        return new Schedule(0, TimeUnit.NANOSECONDS);
       }
      };
     }
    };
  service.startAsync().awaitRunning();
  inGetNextSchedule.await();
  service.stopAsync();
 }
}

代码示例来源:origin: ehcache/ehcache3

@Test
public void CachingTierDoesNotSeeAnyOperationDuringClear() throws StoreAccessException, BrokenBarrierException, InterruptedException {
 final TieredStore<String, String> tieredStore = new TieredStore<>(stringCachingTier, stringAuthoritativeTier);
 final CyclicBarrier barrier = new CyclicBarrier(2);
 doAnswer((Answer<Void>) invocation -> {
  barrier.await();
  barrier.await();
  return null;
 }).when(stringAuthoritativeTier).clear();
 Thread t = new Thread(() -> {
  try {
   tieredStore.clear();
  } catch (Exception e) {
   throw new RuntimeException(e);
  }
 });
 t.start();
 barrier.await();
 tieredStore.get("foo");
 barrier.await();
 t.join();
 verify(stringCachingTier, never()).getOrComputeIfAbsent(
  ArgumentMatchers.any(), ArgumentMatchers.any());
}

代码示例来源:origin: google/guava

public void testDirectExecutorServiceServiceTermination() throws Exception {
 final ExecutorService executor = newDirectExecutorService();
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final AtomicReference<Throwable> throwableFromOtherThread = new AtomicReference<>(null);
 final Runnable doNothingRunnable =
 barrier.await(1, TimeUnit.SECONDS);
 assertFalse(executor.isShutdown());
 assertFalse(executor.isTerminated());
 barrier.await(1, TimeUnit.SECONDS);
 assertFalse(executor.awaitTermination(20, TimeUnit.MILLISECONDS));
 barrier.await(1, TimeUnit.SECONDS);
 assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
 assertTrue(executor.awaitTermination(0, TimeUnit.SECONDS));

代码示例来源:origin: apache/pulsar

final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
barrier.await();
for (int i = 0; i < Messages; i++) {
  ledger.addEntry("test".getBytes());

代码示例来源:origin: apache/ignite

/**
 *
 */
@Test
public void testDoInParallel() throws Throwable {
  CyclicBarrier barrier = new CyclicBarrier(3);
  ExecutorService executorService = Executors.newFixedThreadPool(3);
  try {
    IgniteUtils.doInParallel(3,
      executorService,
      asList(1, 2, 3),
      i -> {
        try {
          barrier.await(1, TimeUnit.SECONDS);
        }
        catch (Exception e) {
          throw new IgniteCheckedException(e);
        }
        return null;
      }
    );
  } finally {
    executorService.shutdownNow();
  }
}

代码示例来源:origin: apache/incubator-druid

Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertFalse(q.offer(obj));
final CyclicBarrier barrier = new CyclicBarrier(2);
barrier.await();
q.take();
barrier.await();
q.take();
Assert.assertTrue(future.get());

代码示例来源:origin: apache/pulsar

final CyclicBarrier barrier = new CyclicBarrier(Threads);
final CountDownLatch counter = new CountDownLatch(Threads);
final AtomicBoolean gotException = new AtomicBoolean(false);
  cachedExecutor.execute(() -> {
    try {
      barrier.await();

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void testConcurrentSizeAndHasAnyValueBounded() throws InterruptedException {
  final ReplayProcessor<Object> rs = ReplayProcessor.createWithSize(3);
  final CyclicBarrier cb = new CyclicBarrier(2);
  t.start();
  try {
    cb.await();
  } catch (InterruptedException e) {
    return;

相关文章