io.pravega.common.Exceptions.handleInterrupted()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中io.pravega.common.Exceptions.handleInterrupted()方法的一些代码示例,展示了Exceptions.handleInterrupted()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Exceptions.handleInterrupted()方法的具体详情如下:
包路径:io.pravega.common.Exceptions
类名称:Exceptions
方法名:handleInterrupted

Exceptions.handleInterrupted介绍

[英]Eliminates boilerplate code of catching and re-interrupting the thread.

NOTE: This method currently has the limitation that it can only handle functions that throw up to one additional exception besides InterruptedException. This is a limitation of the Compiler.
[中]消除了捕获和重新中断线程的样板代码。
注意:此方法目前有一个限制,即它只能处理引发InterruptedException之外的一个额外异常的函数。这是编译器的一个限制。

代码示例

代码示例来源:origin: pravega/pravega

private void stopProcess(AtomicReference<Process> processReference) {
  Process p = processReference.getAndSet(null);
  if (p != null) {
    p.destroy();
    Exceptions.handleInterrupted(() -> p.waitFor(PROCESS_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
  }
}

代码示例来源:origin: pravega/pravega

ControllerEvent take = Exceptions.handleInterruptedCall(() -> requestStream.take());
processor.process((TestBase) take, null);
Exceptions.handleInterrupted(() -> Thread.sleep(100));

代码示例来源:origin: pravega/pravega

@Override
  public void close() {
    // Wait until the server socket is closed.
    Exceptions.handleInterrupted(() -> {
      serverChannel.close();
      serverChannel.closeFuture().sync();
    });
    // Shut down all event loops to terminate all threads.
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
  }
}

代码示例来源:origin: pravega/pravega

@Override
  @SneakyThrows(BKException.class)
  public void close() {
    this.logFactory.close();
    this.zkClient.close();
    // There is no need to close the BK Admin object since it doesn't own anything; however it does have a close()
    // method and it's a good idea to invoke it.
    Exceptions.handleInterrupted(this.bkAdmin::close);
  }
}

代码示例来源:origin: pravega/pravega

/**
 * Block until all events are acked by the server.
 */
private void waitForInflight() {
  Exceptions.handleInterrupted(() -> waitingInflight.await());
}

代码示例来源:origin: pravega/pravega

private int stopProcesses(Collection<Process> processList) {
  processList.stream().filter(Objects::nonNull).forEach(p -> {
    p.destroyForcibly();
    Exceptions.handleInterrupted(() -> p.waitFor(PROCESS_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
  });
  int count = processList.size();
  processList.clear();
  return count;
}

代码示例来源:origin: pravega/pravega

/**
 * Waits for the provided future to be complete, and returns true if it was successful, false if it failed
 * or did not complete.
 *
 * @param timeout The maximum number of milliseconds to block
 * @param f       The future to wait for.
 * @param <T>     The Type of the future's result.
 * @return True if the given CompletableFuture is completed and successful within the given timeout.
 */
public static <T> boolean await(CompletableFuture<T> f, long timeout) {
  Exceptions.handleInterrupted(() -> {
    try {
      f.get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException | ExecutionException e) {
      // Not handled here.
    }
  });
  return isSuccessful(f);
}

代码示例来源:origin: pravega/pravega

/**
 * Closes the given LedgerHandle.
 *
 * @param handle The LedgerHandle to close.
 * @throws DurableDataLogException If an exception occurred. The causing exception is wrapped inside it.
 */
static void close(LedgerHandle handle) throws DurableDataLogException {
  try {
    Exceptions.handleInterrupted(handle::close);
  } catch (BKException bkEx) {
    throw new DurableDataLogException(String.format("Unable to close ledger %d.", handle.getId()), bkEx);
  }
}

代码示例来源:origin: pravega/pravega

/**
 * Deletes the Ledger with given LedgerId.
 *
 * @param ledgerId   The Id of the Ledger to delete.
 * @param bookKeeper A reference to the BookKeeper client to use.
 * @throws DurableDataLogException If an exception occurred. The causing exception is wrapped inside it.
 */
static void delete(long ledgerId, BookKeeper bookKeeper) throws DurableDataLogException {
  try {
    Exceptions.handleInterrupted(() -> bookKeeper.deleteLedger(ledgerId));
  } catch (BKException bkEx) {
    throw new DurableDataLogException(String.format("Unable to delete Ledger %d.", ledgerId), bkEx);
  }
}

代码示例来源:origin: pravega/pravega

break;
Exceptions.handleInterrupted(() -> Thread.sleep(250));

代码示例来源:origin: pravega/pravega

@SuppressWarnings("unchecked")
public <RetryT extends Exception, ReturnT> ReturnT run(Retryable<ReturnT, RetryT, ThrowsT> r) throws ThrowsT {
  Preconditions.checkNotNull(r);
  long delay = params.initialMillis;
  Exception last = null;
  for (int attemptNumber = 1; attemptNumber <= params.attempts; attemptNumber++) {
    try {
      return r.attempt();
    } catch (Exception e) {
      if (canRetry(e)) {
        last = e;
      } else if (e instanceof RuntimeException) {
        throw (RuntimeException) e;
      } else {
        throw (ThrowsT) e;
      }
    }
    if (attemptNumber < params.attempts) {
      // no need to sleep if it is the last attempt
      final long sleepFor = delay;
      Exceptions.handleInterrupted(() -> Thread.sleep(sleepFor));
      delay = Math.min(params.maxDelay, params.multiplier * delay);
      log.debug("Retrying command. Retry #{}, timestamp={}", attemptNumber, Instant.now());
    }
  }
  throw new RetriesExhaustedException(last);
}

代码示例来源:origin: pravega/pravega

private void deleteCandidates(List<Long> deletionCandidates, Collection<Long> referencedLedgerIds, Context context) {
  for (long ledgerId : deletionCandidates) {
    if (referencedLedgerIds.contains(ledgerId)) {
      output("Not deleting Ledger %d because is is now referenced.", ledgerId);
      continue;
    }
    try {
      Exceptions.handleInterrupted(() -> context.logFactory.getBookKeeperClient().deleteLedger(ledgerId));
      output("Deleted Ledger %d.", ledgerId);
    } catch (Exception ex) {
      output("FAILED to delete Ledger %d: %s.", ledgerId, ex.getMessage());
    }
  }
}

代码示例来源:origin: pravega/pravega

private Set<PravegaNodeUri> fetchFromServers(ControllerImpl client, int numServers) {
  Set<PravegaNodeUri> uris = new HashSet<>();
  // Reading multiple times to ensure round robin policy gets a chance to read from all available servers.
  // Reading more than the number of servers since on failover request might fail intermittently due to
  // client-server connection timing issues.
  while (uris.size() < numServers) {
    try {
      uris.add(client.getEndpointForSegment("a/b/0").get());
    } catch (Exception e) {
      // Ignore temporary exceptions which happens due to failover.
    }
    // Adding a small delay to avoid busy cpu loop.
    Exceptions.handleInterrupted(() -> Thread.sleep(10));
  }
  return uris;
}

代码示例来源:origin: pravega/pravega

/**
 * Verifies that a Segment does not exist in Storage, with a reasonable delay. This can be used to verify that
 * a delayed delete is actually processed by the Storage Writer.
 */
private void checkNotExistsInStorage(String segmentName, TestContext context) {
  int attemptsLeft = 100;
  final long delay = DEFAULT_WRITER_CONFIG.getFlushThresholdTime().toMillis();
  while (attemptsLeft >= 0 && context.storage.exists(segmentName, TIMEOUT).join()) {
    Exceptions.handleInterrupted(() -> Thread.sleep(delay));
    attemptsLeft--;
  }
  Assert.assertTrue("Segment '" + segmentName + "' still exists in Storage.", attemptsLeft >= 0);
}

代码示例来源:origin: pravega/pravega

while (!stopFlag.get()) {
  try {
    Exceptions.handleInterrupted(() -> Thread.sleep(100));

代码示例来源:origin: pravega/pravega

@Override
public void flush() throws SegmentSealedException {
  //flushLatch is used to simulate a blocking Flush(). .
  Exceptions.handleInterrupted(() -> flushLatch.await());
  throw new SegmentSealedException(segment.toString());
}

代码示例来源:origin: pravega/pravega

void waitForScaling(String scope, String stream, StreamConfiguration initialConfig) {
    int initialMaxSegmentNumber = initialConfig.getScalingPolicy().getMinNumSegments() - 1;
    boolean scaled = false;
    for (int waitCounter = 0; waitCounter < SCALE_WAIT_ITERATIONS; waitCounter++) {
      StreamSegments streamSegments = controller.getCurrentSegments(scope, stream).join();
      if (streamSegments.getSegments().stream().mapToLong(Segment::getSegmentId).max().orElse(-1) > initialMaxSegmentNumber) {
        scaled = true;
        break;
      }
      //Scaling operation did not happen, wait
      Exceptions.handleInterrupted(() -> Thread.sleep(10000));
    }

    assertTrue("Scaling did not happen within desired time", scaled);
  }
}

代码示例来源:origin: pravega/pravega

@SneakyThrows
private static <T> int readEvents(EventStreamReader<T> reader, int limit) {
  final int timeout = 1000;
  final int interReadWait = 50;
  EventRead<T> event;
  int validEvents = 0;
  try {
    do {
      event = reader.readNextEvent(timeout);
      Exceptions.handleInterrupted(() -> Thread.sleep(interReadWait));
      if (event.getEvent() != null) {
        validEvents++;
      }
    } while ((event.getEvent() != null || event.isCheckpoint()) && validEvents < limit);
    reader.close();
  } catch (TruncatedDataException e) {
    reader.close();
    throw new TruncatedDataException(e.getCause());
  } catch (RuntimeException e) {
    if (e.getCause() instanceof RetriesExhaustedException) {
      throw new RetriesExhaustedException(e.getCause());
    } else {
      throw e;
    }
  }
  return validEvents;
}

代码示例来源:origin: pravega/pravega

private Map<Stream, StreamCut> generateStreamCuts(final ReaderGroup readerGroup) {
  log.info("Generate StreamCuts");
  String readerId = "streamCut";
  CompletableFuture<Map<io.pravega.client.stream.Stream, StreamCut>> streamCuts = null;
  try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(SCOPE_2, ClientConfig.builder().controllerURI(controllerURI).build());
     EventStreamReader<Integer> reader = clientFactory.createReader(readerId, READER_GROUP_NAME,
                                    new JavaSerializer<Integer>(), readerConfig)) {
    streamCuts = readerGroup.generateStreamCuts(executor); //create checkpoint
    Exceptions.handleInterrupted(() -> TimeUnit.MILLISECONDS.sleep(GROUP_REFRESH_TIME_MILLIS)); // sleep for group refresh.
    //read the next event, this causes the reader to update its latest offset.
    EventRead<Integer> event = reader.readNextEvent(READ_TIMEOUT);
    assertTrue("No events expected as all events are read", (event.getEvent() == null) && (!event.isCheckpoint()));
    Futures.exceptionListener(streamCuts, t -> log.error("StreamCut generation failed", t));
    assertTrue("Stream cut generation should be completed", Futures.await(streamCuts));
  } catch (ReinitializationRequiredException e) {
    log.error("Exception while reading event using readerId: {}", readerId, e);
    fail("Reinitialization Exception is not expected");
  }
  return streamCuts.join();
}

代码示例来源:origin: pravega/pravega

final Consumer<Segment> segmentSealedCallback = segment ->  Exceptions.handleInterrupted(() -> latch.await());

相关文章