java.util.concurrent.CyclicBarrier类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(14.5k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中java.util.concurrent.CyclicBarrier类的一些代码示例,展示了CyclicBarrier类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。CyclicBarrier类的具体详情如下:
包路径:java.util.concurrent.CyclicBarrier
类名称:CyclicBarrier

CyclicBarrier介绍

[英]A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

Sample usage: Here is an example of using a barrier in a parallel decomposition design:

class Solver Worker(int row) { myRow = row; } 
public void run()  
while (!done())  
processRow(myRow); 
try  
barrier.await(); 
} catch (InterruptedException ex)  
return; 
} catch (BrokenBarrierException ex)  
return; 
} 
} 
} 
} 
public Solver(float[][] matrix)  
data = matrix; 
N = matrix.length; 
barrier = new CyclicBarrier(N, 
new Runnable()  
public void run()  
mergeRows(...); 
} 
}); 
for (int i = 0; i < N; ++i) 
new Thread(new Worker(i)).start(); 
waitUntilDone(); 
} 
}}

Here, each worker thread processes a row of the matrix then waits at the barrier until all rows have been processed. When all rows are processed the supplied Runnable barrier action is executed and merges the rows. If the merger determines that a solution has been found then done() will return true and each worker will terminate.

If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of #await returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:

if (barrier.await() == 0)

The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).

Memory consistency effects: Actions in a thread prior to calling await()happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
[中]一种同步辅助工具,允许一组线程都等待对方到达一个共同的障碍点。CyclicBarrier在涉及固定大小的线程组的程序中很有用,这些线程有时必须互相等待。该屏障被称为“循环”,因为它可以在等待线程释放后重新使用。
CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,在每个屏障点运行一次。此“屏障操作”用于在任何一方继续之前更新共享状态。
示例用法:以下是在并行分解设计中使用屏障的示例:

class Solver Worker(int row) { myRow = row; } 
public void run()  
while (!done())  
processRow(myRow); 
try  
barrier.await(); 
} catch (InterruptedException ex)  
return; 
} catch (BrokenBarrierException ex)  
return; 
} 
} 
} 
} 
public Solver(float[][] matrix)  
data = matrix; 
N = matrix.length; 
barrier = new CyclicBarrier(N, 
new Runnable()  
public void run()  
mergeRows(...); 
} 
}); 
for (int i = 0; i < N; ++i) 
new Thread(new Worker(i)).start(); 
waitUntilDone(); 
} 
}}

在这里,每个工作线程处理矩阵中的一行,然后在屏障处等待,直到处理完所有行。处理完所有行后,将执行提供的可运行屏障操作并合并这些行。如果合并确定已找到解决方案,则done()将返回true,并且每个worker将终止。
如果屏障操作在执行时不依赖于被暂停的当事方,则当该操作被释放时,当事方中的任何线程都可以执行该操作。为了便于实现这一点,每次调用#wait都会返回该线程在屏障处的到达索引。然后,您可以选择哪个线程应执行屏障操作,例如:

if (barrier.await() == 0)

CyclicBarrier对失败的同步尝试使用“全部”或“无”中断模型:如果线程由于中断、故障或超时而过早离开一个障碍点,在该屏障点等待的所有其他线程也将通过BrokenBarrierException(或InterruptedException,如果它们也在大约同一时间被中断)异常离开。
内存一致性影响:在调用await()happen-before操作之前线程中的操作,这些操作是barrier操作的一部分,而barrier操作又发生在其他线程中相应的await()成功返回后的操作之前。

代码示例

代码示例来源:origin: google/guava

final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < 1000; i++) {
 final AtomicInteger counter = new AtomicInteger();
 final TrustedListenableFutureTask<Integer> task =
   TrustedListenableFutureTask.create(
 final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
 Runnable wrapper =
   new Runnable() {
  executor.execute(wrapper);
 barrier.await(); // release the threads!
 barrier.await(); // wait for them all to complete
 assertEquals(1, task.get().intValue());
 assertEquals(1, counter.get());
executor.shutdown();

代码示例来源:origin: apache/ignite

boolean warmupFinished0 = false;
bar.await();
  if (done.get())
    break;
    warmupFinished0 = warmupFinished.get();
bar.await();
  bar.reset();

代码示例来源:origin: LeonardoZ/java-concurrency-patterns

public static void main(String[] args) {
  ExecutorService executor = Executors.newCachedThreadPool();
  Runnable barrierAction = () -> System.out.println("Well done, guys!");
  CyclicBarrier barrier = new CyclicBarrier(10, barrierAction);
  Runnable task = () -> {
    try {
      // simulating a task that can take at most 1sec to run
      System.out.println("Doing task for " + Thread.currentThread().getName());
      Thread.sleep(new Random().nextInt(10) * 100);
      System.out.println("Done for " + Thread.currentThread().getName());
      barrier.await();
    } catch (InterruptedException | BrokenBarrierException e) {
      e.printStackTrace();
    }
  };
  for (int i = 0; i < 10; i++) {
    executor.execute(task);
  }
  executor.shutdown();
}

代码示例来源:origin: apache/ignite

@Override public void run() {
    try {
      barrier.await();
      stopGrid(1);
      barrier.await();
      startGrid(1);
      barrier.await();
    }
    catch (Exception e) {
      e.printStackTrace();
      barrier.reset();
    }
  }
});

代码示例来源:origin: google/guava

public void testCustomSchedule_startStop() throws Exception {
 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
 final AtomicBoolean shouldWait = new AtomicBoolean(true);
 Runnable task =
   new Runnable() {
    @Override
    public void run() {
     try {
      if (shouldWait.get()) {
       firstBarrier.await();
       secondBarrier.await();
      }
     } catch (Exception e) {
      throw new RuntimeException(e);
     }
    }
   };
 TestCustomScheduler scheduler = new TestCustomScheduler();
 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
 firstBarrier.await();
 assertEquals(1, scheduler.scheduleCounter.get());
 secondBarrier.await();
 firstBarrier.await();
 assertEquals(2, scheduler.scheduleCounter.get());
 shouldWait.set(false);
 secondBarrier.await();
 future.cancel(false);
}

代码示例来源:origin: bluestreak01/questdb

@Test
public void testLockRace() throws Exception {
  assertWithPool(pool -> {
    AtomicInteger successCount = new AtomicInteger();
    AtomicInteger failureCount = new AtomicInteger();
    AtomicInteger exceptionCount = new AtomicInteger();
    CyclicBarrier barrier = new CyclicBarrier(2);
    CountDownLatch stopLatch = new CountDownLatch(2);
        barrier.await();
        if (pool.lock("xyz")) {
          successCount.incrementAndGet();
        e.printStackTrace();
      } finally {
        stopLatch.countDown();
    new Thread(runnable).start();
    Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
    Assert.assertEquals(0, exceptionCount.get());
    Assert.assertEquals(1, successCount.get());

代码示例来源:origin: JCTools/JCTools

public NonBlockingHashMapLong<TestKey> getMapMultithreaded() throws InterruptedException, ExecutionException
  {
    final int threadCount = _items.keySet().size();
    final NonBlockingHashMapLong<TestKey> map = new NonBlockingHashMapLong<>();
    // use a barrier to open the gate for all threads at once to avoid rolling start and no actual concurrency
    final CyclicBarrier barrier = new CyclicBarrier(threadCount);
    final ExecutorService ex = Executors.newFixedThreadPool(threadCount);
    final CompletionService<Integer> co = new ExecutorCompletionService<>(ex);
    for (Integer type : _items.keySet())
    {
      // A linked-list of things to insert
      List<TestKey> items = _items.get(type);
      TestKeyFeederThread feeder = new TestKeyFeederThread(items, map, barrier);
      co.submit(feeder);
    }
    // wait for all threads to return
    int itemCount = 0;
    for (int retCount = 0; retCount < threadCount; retCount++)
    {
      final Future<Integer> result = co.take();
      itemCount += result.get();
    }
    ex.shutdown();
    return map;
  }
}

代码示例来源:origin: apache/pulsar

ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
  futures.add(executor.submit(new Callable<Void>() {
    @Override
    public Void call() throws Exception {
barrier.await();
for (int i = 0; i < Messages; i++) {
  ledger.addEntry("test".getBytes());
  f.get();

代码示例来源:origin: google/guava

public void testSetFutureCancelBash_withDoneFuture() {
 final CyclicBarrier barrier =
   new CyclicBarrier(
     2 // for the setter threads
       + 1 // for the blocking get thread,
       + 1); // for the main thread
 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
 final AtomicBoolean setFutureSuccess = new AtomicBoolean();
 final AtomicBoolean cancellationSucess = new AtomicBoolean();
 Callable<Void> cancelRunnable =
   new Callable<Void>() {
 allTasks.add(cancelRunnable);
 allTasks.add(setFutureCompleteSucessFullyRunnable);
 allTasks.add(Executors.callable(collectResultsRunnable));
 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
 for (int i = 0; i < 1000; i++) {
  Collections.shuffle(allTasks);
  for (Callable<?> task : allTasks) {
   @SuppressWarnings("unused") // go/futurereturn-lsc
   Future<?> possiblyIgnoredError = executor.submit(task);
  finalResults.clear();
 executor.shutdown();

代码示例来源:origin: google/guava

new CyclicBarrier(
    6 // for the setter threads
      + 50 // for the listeners
      + 50 // for the blocking get threads,
      + 1); // for the main thread
final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
final AtomicInteger numSuccessfulSetCalls = new AtomicInteger();
Callable<Void> completeSucessFullyRunnable =
  new Callable<Void>() {
 allTasks.add(Executors.callable(listener));
 allTasks.add(
   new Callable<Void>() {
assertEquals(allTasks.size() + 1, barrier.getParties());
for (int i = 0; i < 1000; i++) {
 Collections.shuffle(allTasks);
 for (Callable<?> task : allTasks) {
  @SuppressWarnings("unused") // go/futurereturn-lsc
  Future<?> possiblyIgnoredError = executor.submit(task);
  if (future.wasInterrupted()) {
 finalResults.clear();
executor.shutdown();

代码示例来源:origin: google/guava

public void testSetFutureCancelBash() {
 final int size = 50;
 final CyclicBarrier barrier =
   new CyclicBarrier(
     2 // for the setter threads
 final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
 final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
 final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference();
 final AtomicBoolean setFutureSetSucess = new AtomicBoolean();
 final AtomicBoolean setFutureCompletionSucess = new AtomicBoolean();
 final AtomicBoolean cancellationSucess = new AtomicBoolean();
 Runnable cancelRunnable =
   new Runnable() {
 assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
 for (int i = 0; i < 1000; i++) {
  Collections.shuffle(allTasks);
  setFutureFuture.set(setFuture);
  for (Runnable task : allTasks) {
   executor.execute(task);
  finalResults.clear();
 executor.shutdown();

代码示例来源:origin: apache/ignite

final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
latch.await();
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 500, 0)) {
  clientCache0.put(0, 3);
  clientCache0.put(1, 3);
assertTrue(deadlock.get());

代码示例来源:origin: apache/ignite

final int txCnt = 2;
final CyclicBarrier barrier = new CyclicBarrier(txCnt);
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
final AtomicBoolean timeout = new AtomicBoolean();
assertFalse(deadlock.get());
assertTrue(timeout.get());

代码示例来源:origin: apache/ignite

@Override public void run() {
    int num = threadCnt.getAndIncrement();
    Ignite ignite = ignite(num);
    IgniteCache<Object, Integer> cache = ignite.cache(CACHE);
    try (Transaction tx =
         ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, num == 0 ? 500 : 1500, 0)
    ) {
      int key1 = primaryKey(ignite((num + 1) % txCnt).cache(CACHE));
      log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
        ", tx=" + tx + ", key=" + key1 + ']');
      cache.put(new TestKey(key1), 1);
      barrier.await();
      int key2 = primaryKey(cache);
      log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
        ", tx=" + tx + ", key=" + key2 + ']');
      cache.put(new TestKey(key2), 2);
      tx.commit();
    }
    catch (Exception e) {
      timeout.compareAndSet(false, hasCause(e, TransactionTimeoutException.class));
      deadlock.compareAndSet(false, hasCause(e, TransactionDeadlockException.class));
    }
  }
}, txCnt, "tx-thread");

代码示例来源:origin: cdapio/cdap

@Test
public void testConcurrentLimitRate() throws InterruptedException {
 final LogSampler sampler = LogSamplers.limitRate(50, new TimeProvider.IncrementalTimeProvider());
 final AtomicInteger acceptedCount = new AtomicInteger();
 final CyclicBarrier barrier = new CyclicBarrier(threadCount);
 final CountDownLatch completeLatch = new CountDownLatch(threadCount);
 for (int i = 0; i < 5; i++) {
  final int threadId = i;
 Assert.assertTrue(completeLatch.await(10, SECONDS));
 Assert.assertEquals(10, acceptedCount.get());

代码示例来源:origin: google/guava

public void testDirectExecutorServiceServiceTermination() throws Exception {
 final ExecutorService executor = newDirectExecutorService();
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final AtomicReference<Throwable> throwableFromOtherThread = new AtomicReference<>(null);
 final Runnable doNothingRunnable =
 barrier.await(1, TimeUnit.SECONDS);
 assertFalse(executor.isShutdown());
 assertFalse(executor.isTerminated());
 executor.shutdown();
 assertTrue(executor.isShutdown());
 try {
 barrier.await(1, TimeUnit.SECONDS);
 assertFalse(executor.awaitTermination(20, TimeUnit.MILLISECONDS));
 barrier.await(1, TimeUnit.SECONDS);
 assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
 assertTrue(executor.awaitTermination(0, TimeUnit.SECONDS));

代码示例来源:origin: LMAX-Exchange/disruptor

int handlerCount = threads;
CyclicBarrier barrier = new CyclicBarrier(publisherCount);
CountDownLatch latch = new CountDownLatch(publisherCount);
  executor.execute(publisher);
latch.await();
while (ringBuffer.getCursor() < (iterations - 1))

代码示例来源:origin: google/guava

public void testTaskThrowsError() throws Exception {
 class MyError extends Error {}
 final CyclicBarrier barrier = new CyclicBarrier(2);
 ExecutorService service = Executors.newSingleThreadExecutor();
 try {
  final SequentialExecutor executor = new SequentialExecutor(service);
  service.execute(barrierTask); // submit directly to the service
  barrier.await(1, TimeUnit.SECONDS);
  executor.execute(barrierTask);
  barrier.await(1, TimeUnit.SECONDS);
 } finally {
  service.shutdown();

代码示例来源:origin: google/guava

public void testRunOneIterationCalledMultipleTimes() throws Exception {
 TestService service = new TestService();
 service.startAsync().awaitRunning();
 for (int i = 1; i < 10; i++) {
  service.runFirstBarrier.await();
  assertEquals(i, service.numberOfTimesRunCalled.get());
  service.runSecondBarrier.await();
 }
 service.runFirstBarrier.await();
 service.stopAsync();
 service.runSecondBarrier.await();
 service.stopAsync().awaitTerminated();
}

代码示例来源:origin: apache/pulsar

public void run() {
    try {
      barrier.await();
      for (Position position : addedEntries) {
        cursor.markDelete(position);
      }
    } catch (Exception e) {
      e.printStackTrace();
      gotException.set(true);
    } finally {
      counter.countDown();
    }
  }
};

相关文章