java.util.concurrent.ExecutorService类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(121)

本文整理了Java中java.util.concurrent.ExecutorService类的一些代码示例,展示了ExecutorService类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ExecutorService类的具体详情如下:
包路径:java.util.concurrent.ExecutorService
类名称:ExecutorService

ExecutorService介绍

[英]An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService. The #shutdownmethod will allow previously submitted tasks to execute before terminating, while the #shutdownNow method prevents waiting tasks from starting and attempts to stop currently executing tasks. Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.

Method submit extends base method Executor#execute by creating and returning a Future that can be used to cancel execution and/or wait for completion. Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService can be used to write customized variants of these methods.)

The Executors class provides factory methods for the executor services provided in this package.

Usage Examples

Here is a sketch of a network service in which threads in a thread pool service incoming requests. It uses the preconfigured Executors#newFixedThreadPool factory method:

class NetworkService implements Runnable public void run() { // run the service 
try  
for (;;)  
pool.execute(new Handler(serverSocket.accept())); 
} 
} catch (IOException ex)  
pool.shutdown(); 
} 
} 
} 
class Handler implements Runnable  
private final Socket socket; 
Handler(Socket socket) { this.socket = socket; } 
public void run()  
// read and service request on socket 
} 
}}

The following method shuts down an ExecutorService in two phases, first by calling shutdown to reject incoming tasks, and then calling shutdownNow, if necessary, to cancel any lingering tasks:

void shutdownAndAwaitTermination(ExecutorService pool) } catch (InterruptedException ie)  
// (Re-)Cancel if current thread also interrupted 
pool.shutdownNow(); 
// Preserve interrupt status 
Thread.currentThread().interrupt(); 
} 
}}

Memory consistency effects: Actions in a thread prior to the submission of a Runnable or Callable task to an ExecutorServicehappen-before any actions taken by that task, which in turn happen-before the result is retrieved via Future.get().
[中]一种执行器,提供管理终止的方法,以及可以生成未来跟踪一个或多个异步任务进度的方法。
ExecutorService可能会被关闭,这将导致它拒绝新任务。为关闭Executor服务提供了两种不同的方法。#shutdownNow方法允许在终止之前执行以前提交的任务,而#shutdownNow方法防止等待的任务启动并尝试停止当前正在执行的任务。终止后,执行者没有正在执行的任务,没有等待执行的任务,也不能提交新任务。应关闭未使用的ExecutorService以允许回收其资源。
方法提交通过创建并返回可用于取消执行和/或等待完成的未来来扩展基本方法执行器执行。方法invokeAny和invokeAll执行最常用的批量执行形式,执行一组任务,然后等待至少一个或全部任务完成。(类ExecutorCompletionService可用于编写这些方法的自定义变体。)
Executors类为此包中提供的executor服务提供工厂方法。
####用法示例
下面是一个网络服务的示意图,其中线程池中的线程为传入的请求提供服务。它使用预配置的Executors#newFixedThreadPool工厂方法:

class NetworkService implements Runnable public void run() { // run the service 
try  
for (;;)  
pool.execute(new Handler(serverSocket.accept())); 
} 
} catch (IOException ex)  
pool.shutdown(); 
} 
} 
} 
class Handler implements Runnable  
private final Socket socket; 
Handler(Socket socket) { this.socket = socket; } 
public void run()  
// read and service request on socket 
} 
}}

以下方法分两个阶段关闭ExecutorService,首先调用shutdown拒绝传入任务,然后调用shutdownNow(如果需要)取消任何延迟任务:

void shutdownAndAwaitTermination(ExecutorService pool) } catch (InterruptedException ie)  
// (Re-)Cancel if current thread also interrupted 
pool.shutdownNow(); 
// Preserve interrupt status 
Thread.currentThread().interrupt(); 
} 
}}

内存一致性影响:在将可运行或可调用任务提交给ExecutorServicehappen-before之前线程中的操作该任务执行的任何操作,而这些操作又发生在通过Future检索结果之前。get()。

代码示例

canonical example by Tabnine

public void runThreadTask() {
 ExecutorService service = Executors.newCachedThreadPool();
 service.execute(
   () -> {
    // ... do something inside runnable task
   });
 service.shutdown();
}

代码示例来源:origin: google/guava

@Override
public void runWithTimeout(Runnable runnable, long timeoutDuration, TimeUnit timeoutUnit)
  throws TimeoutException, InterruptedException {
 checkNotNull(runnable);
 checkNotNull(timeoutUnit);
 checkPositiveTimeout(timeoutDuration);
 Future<?> future = executor.submit(runnable);
 try {
  future.get(timeoutDuration, timeoutUnit);
 } catch (InterruptedException | TimeoutException e) {
  future.cancel(true /* mayInterruptIfRunning */);
  throw e;
 } catch (ExecutionException e) {
  wrapAndThrowRuntimeExecutionExceptionOrError(e.getCause());
  throw new AssertionError();
 }
}

代码示例来源:origin: stackoverflow.com

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
 taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
 taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
 ...
}

代码示例来源:origin: square/okhttp

private void parallelDrainQueue(int threadCount) {
 ExecutorService executor = Executors.newFixedThreadPool(threadCount);
 for (int i = 0; i < threadCount; i++) {
  executor.execute(new NamedRunnable("Crawler %s", i) {
   @Override protected void execute() {
    try {
     drainQueue();
    } catch (Exception e) {
     e.printStackTrace();
    }
   }
  });
 }
 executor.shutdown();
}

代码示例来源:origin: iluwatar/java-design-patterns

/**
  * Stops the pool of workers.
  * 
  * @throws InterruptedException if interrupted while stopping pool of workers.
  */
 @Override
 public void stop() throws InterruptedException {
  executorService.shutdown();
  executorService.awaitTermination(4, TimeUnit.SECONDS);
 }
}

代码示例来源:origin: iluwatar/java-design-patterns

/**
 * Stops logging clients. This is a blocking call.
 */
public void stop() {
 service.shutdown();
 if (!service.isTerminated()) {
  service.shutdownNow();
  try {
   service.awaitTermination(1000, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   LOGGER.error("exception awaiting termination", e);
  }
 }
 LOGGER.info("Logging clients stopped");
}

代码示例来源:origin: apache/hive

private void runParallelQueries(String[] queries)
  throws InterruptedException, ExecutionException {
 ExecutorService executor = Executors.newFixedThreadPool(queries.length);
 final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1);
 Future<?>[] tasks = new Future[queries.length];
 for (int i = 0; i < tasks.length; ++i) {
  tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut));
 }
 cdlIn.await(); // Wait for all threads to be ready.
 cdlOut.countDown(); // Release them at the same time.
 for (int i = 0; i < tasks.length; ++i) {
  tasks[i].get();
 }
}

代码示例来源:origin: ReactiveX/RxJava

final int NUM_RETRIES = Flowable.bufferSize() * 2;
int ncpu = Runtime.getRuntime().availableProcessors();
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 2));
try {
  for (int r = 0; r < NUM_LOOPS; r++) {
    final AtomicInteger timeouts = new AtomicInteger();
    final Map<Integer, List<String>> data = new ConcurrentHashMap<Integer, List<String>>();
    final CountDownLatch cdl = new CountDownLatch(m);
    for (int i = 0; i < m; i++) {
      final int j = i;
      exec.execute(new Runnable() {
        @Override
        public void run() {
    cdl.await();
    assertEquals(0, timeouts.get());
    if (data.size() > 0) {
      fail("Data content mismatch: " + allSequenceFrequency(data));
  exec.shutdown();

代码示例来源:origin: spring-projects/spring-framework

@Test
public void nonSharedEngine() throws Exception {
  int iterations = 20;
  this.view.setEngineName("nashorn");
  this.view.setRenderFunction("render");
  this.view.setSharedEngine(false);
  this.view.setApplicationContext(this.context);
  ExecutorService executor = Executors.newFixedThreadPool(4);
  List<Future<Boolean>> results = new ArrayList<>();
  for (int i = 0; i < iterations; i++) {
    results.add(executor.submit(() -> view.getEngine() != null));
  }
  assertEquals(iterations, results.size());
  for (int i = 0; i < iterations; i++) {
    assertTrue(results.get(i).get());
  }
  executor.shutdown();
}

代码示例来源:origin: google/guava

@GwtIncompatible // Threads
public void testTransformAsync_functionToString() throws Exception {
 final CountDownLatch functionCalled = new CountDownLatch(1);
 final CountDownLatch functionBlocking = new CountDownLatch(1);
 AsyncFunction<Object, Object> function =
   new AsyncFunction<Object, Object>() {
    @Override
    public ListenableFuture<Object> apply(Object input) throws Exception {
     functionCalled.countDown();
     functionBlocking.await();
     return immediateFuture(null);
    }
    @Override
    public String toString() {
     return "Called my toString";
    }
   };
 ExecutorService executor = Executors.newSingleThreadExecutor();
 try {
  ListenableFuture<?> output =
    Futures.transformAsync(immediateFuture(null), function, executor);
  functionCalled.await();
  assertThat(output.toString()).contains("Called my toString");
 } finally {
  functionBlocking.countDown();
  executor.shutdown();
 }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposeRace() {
  ExecutorService exec = Executors.newSingleThreadExecutor();
  final Scheduler s = Schedulers.from(exec, true);
  try {
    for (int i = 0; i < 500; i++) {
      final Worker w = s.createWorker();
      final AtomicInteger c = new AtomicInteger(2);
      w.schedule(new Runnable() {
        @Override
        public void run() {
          c.decrementAndGet();
          while (c.get() != 0) { }
        }
      });
      c.decrementAndGet();
      while (c.get() != 0) { }
      w.dispose();
    }
  } finally {
    exec.shutdownNow();
  }
}

代码示例来源:origin: pentaho/pentaho-kettle

@Test
 public void concurrentSyslogMessageTest() throws Exception {
 SyslogMessageTask syslogMessage = null;
 ExecutorService service = Executors.newFixedThreadPool( numOfTasks );
 for ( int i = 0; i < numOfTasks; i++ ) {
  syslogMessage = createSyslogMessageTask();
  service.execute( syslogMessage );
 }
 service.shutdown();
 countDownLatch.countDown();
 service.awaitTermination( 10000, TimeUnit.NANOSECONDS );
 Assert.assertTrue( numOfErrors.get() == 0 );
}

代码示例来源:origin: google/guava

int tasksPerThread = 10;
int nTasks = nThreads * tasksPerThread;
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ImmutableList<String> keys = ImmutableList.of("a", "b", "c");
try {
 List<Future<int[]>> futures = Lists.newArrayListWithExpectedSize(nTasks);
 for (int i = 0; i < nTasks; i++) {
  futures.add(pool.submit(new MutateTask(multiset, keys)));
  int[] taskDeltas = future.get();
  for (int i = 0; i < deltas.length; i++) {
   deltas[i] += taskDeltas[i];
 assertEquals("Counts not as expected", Ints.asList(deltas), actualCounts);
} finally {
 pool.shutdownNow();
 assertTrue("map should not contain a zero", value.get() != 0);

代码示例来源:origin: google/guava

final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < 1000; i++) {
 final AtomicInteger counter = new AtomicInteger();
 final TrustedListenableFutureTask<Integer> task =
   TrustedListenableFutureTask.create(
  executor.execute(wrapper);
 assertEquals(1, counter.get());
executor.shutdown();

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 30000)
public void testIssue2890NoStackoverflow() throws InterruptedException {
  final ExecutorService executor = Executors.newFixedThreadPool(2);
  final Scheduler sch = Schedulers.from(executor);
  final AtomicInteger counter = new AtomicInteger();
  executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
  assertEquals(n, counter.get());

代码示例来源:origin: google/guava

public void testEnqueueAndDispatch_multithreaded() throws InterruptedException {
 Object listener = new Object();
 ExecutorService service = Executors.newFixedThreadPool(4);
 ListenerCallQueue<Object> queue = new ListenerCallQueue<>();
 try {
  queue.addListener(listener, service);
  final CountDownLatch latch = new CountDownLatch(1);
  Multiset<Object> counters = ConcurrentHashMultiset.create();
  queue.enqueue(incrementingEvent(counters, listener, 1));
  queue.enqueue(incrementingEvent(counters, listener, 2));
  queue.enqueue(incrementingEvent(counters, listener, 3));
  queue.enqueue(incrementingEvent(counters, listener, 4));
  queue.enqueue(countDownEvent(latch));
  assertEquals(0, counters.size());
  queue.dispatch();
  latch.await();
  assertEquals(multiset(listener, 4), counters);
 } finally {
  service.shutdown();
 }
}

代码示例来源:origin: google/guava

@GwtIncompatible // threads
public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction()
  throws InterruptedException {
 final AtomicBoolean callableCalled = new AtomicBoolean();
 AsyncCallable<Integer> callable =
   new AsyncCallable<Integer>() {
    @Override
    public ListenableFuture<Integer> call() {
     callableCalled.set(true);
     return immediateFuture(1);
    }
   };
 ExecutorService executor = newSingleThreadExecutor();
 // Pause the executor.
 final CountDownLatch beforeFunction = new CountDownLatch(1);
 executor.execute(
   new Runnable() {
    @Override
    public void run() {
     awaitUninterruptibly(beforeFunction);
    }
   });
 ListenableFuture<Integer> future = submitAsync(callable, executor);
 future.cancel(false);
 // Unpause the executor.
 beforeFunction.countDown();
 executor.shutdown();
 assertTrue(executor.awaitTermination(5, SECONDS));
 assertFalse(callableCalled.get());
}

代码示例来源:origin: google/guava

public void testRejectedExecutionThrownWithMultipleCalls() throws Exception {
 final CountDownLatch latch = new CountDownLatch(1);
 final SettableFuture<?> future = SettableFuture.create();
 final Executor delegate =
 final ExecutorService blocked = Executors.newCachedThreadPool();
 Future<?> first =
   blocked.submit(
     new Runnable() {
      @Override
 } catch (RejectedExecutionException expected) {
 latch.countDown();
 try {
  first.get(10, TimeUnit.SECONDS);
  fail();
 } catch (ExecutionException expected) {

代码示例来源:origin: square/dagger

@Test public void concurrentSingletonAccess() throws Exception {
 final List<Future<Long>> futures = new ArrayList<Future<Long>>();
 final ObjectGraph graph =
   ObjectGraph.createWith(new TestingLoader(), new LatchingModule(latch));
 for (int i = 0; i < THREAD_COUNT; i++) {
  futures.add(es.submit(new Callable<Long>() {
   @Override public Long call() {
    latch.countDown();
    return graph.get(Long.class);
   }
  }));
 }
 latch.countDown();
 for (Future<Long> future : futures) {
  assertThat(future.get(1, TimeUnit.SECONDS))
    .named("Lock failure - count should never increment")
    .isEqualTo(0);
 }
}

代码示例来源:origin: apache/kafka

@Test
public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
  setupCoordinator();
  mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
  coordinator.ensureCoordinatorReady(mockTime.timer(0));
  ExecutorService executor = Executors.newFixedThreadPool(1);
  try {
    Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS);
    Future<Boolean> firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer));
    mockTime.sleep(REQUEST_TIMEOUT_MS);
    assertFalse(firstAttempt.get());
    assertTrue(consumerClient.hasPendingRequests(coordinatorNode));
    mockClient.respond(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
    mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
    Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS);
    Future<Boolean> secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer));
    assertTrue(secondAttempt.get());
  } finally {
    executor.shutdownNow();
    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
  }
}

相关文章