
x33g5p2x  于2022-01-19 转载在 其他  



[英]Eliminates boilerplate code of catching and re-interrupting the thread.

NOTE: This method currently has the limitation that it can only handle functions that throw up to one additional exception besides InterruptedException. This is a limitation of the Compiler.


代码示例来源:origin: pravega/pravega

private void stopProcess(AtomicReference<Process> processReference) {
  Process p = processReference.getAndSet(null);
  if (p != null) {
    Exceptions.handleInterrupted(() -> p.waitFor(PROCESS_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));

代码示例来源:origin: pravega/pravega

ControllerEvent take = Exceptions.handleInterruptedCall(() -> requestStream.take());
processor.process((TestBase) take, null);
Exceptions.handleInterrupted(() -> Thread.sleep(100));

代码示例来源:origin: pravega/pravega

  public void close() {
    // Wait until the server socket is closed.
    Exceptions.handleInterrupted(() -> {
    // Shut down all event loops to terminate all threads.

代码示例来源:origin: pravega/pravega

  public void close() {
    // There is no need to close the BK Admin object since it doesn't own anything; however it does have a close()
    // method and it's a good idea to invoke it.

代码示例来源:origin: pravega/pravega

 * Block until all events are acked by the server.
private void waitForInflight() {
  Exceptions.handleInterrupted(() -> waitingInflight.await());

代码示例来源:origin: pravega/pravega

private int stopProcesses(Collection<Process> processList) { -> {
    Exceptions.handleInterrupted(() -> p.waitFor(PROCESS_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
  int count = processList.size();
  return count;

代码示例来源:origin: pravega/pravega

 * Waits for the provided future to be complete, and returns true if it was successful, false if it failed
 * or did not complete.
 * @param timeout The maximum number of milliseconds to block
 * @param f       The future to wait for.
 * @param <T>     The Type of the future's result.
 * @return True if the given CompletableFuture is completed and successful within the given timeout.
public static <T> boolean await(CompletableFuture<T> f, long timeout) {
  Exceptions.handleInterrupted(() -> {
    try {
      f.get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException | ExecutionException e) {
      // Not handled here.
  return isSuccessful(f);

代码示例来源:origin: pravega/pravega

 * Closes the given LedgerHandle.
 * @param handle The LedgerHandle to close.
 * @throws DurableDataLogException If an exception occurred. The causing exception is wrapped inside it.
static void close(LedgerHandle handle) throws DurableDataLogException {
  try {
  } catch (BKException bkEx) {
    throw new DurableDataLogException(String.format("Unable to close ledger %d.", handle.getId()), bkEx);

代码示例来源:origin: pravega/pravega

 * Deletes the Ledger with given LedgerId.
 * @param ledgerId   The Id of the Ledger to delete.
 * @param bookKeeper A reference to the BookKeeper client to use.
 * @throws DurableDataLogException If an exception occurred. The causing exception is wrapped inside it.
static void delete(long ledgerId, BookKeeper bookKeeper) throws DurableDataLogException {
  try {
    Exceptions.handleInterrupted(() -> bookKeeper.deleteLedger(ledgerId));
  } catch (BKException bkEx) {
    throw new DurableDataLogException(String.format("Unable to delete Ledger %d.", ledgerId), bkEx);

代码示例来源:origin: pravega/pravega

Exceptions.handleInterrupted(() -> Thread.sleep(250));

代码示例来源:origin: pravega/pravega

public <RetryT extends Exception, ReturnT> ReturnT run(Retryable<ReturnT, RetryT, ThrowsT> r) throws ThrowsT {
  long delay = params.initialMillis;
  Exception last = null;
  for (int attemptNumber = 1; attemptNumber <= params.attempts; attemptNumber++) {
    try {
      return r.attempt();
    } catch (Exception e) {
      if (canRetry(e)) {
        last = e;
      } else if (e instanceof RuntimeException) {
        throw (RuntimeException) e;
      } else {
        throw (ThrowsT) e;
    if (attemptNumber < params.attempts) {
      // no need to sleep if it is the last attempt
      final long sleepFor = delay;
      Exceptions.handleInterrupted(() -> Thread.sleep(sleepFor));
      delay = Math.min(params.maxDelay, params.multiplier * delay);
      log.debug("Retrying command. Retry #{}, timestamp={}", attemptNumber,;
  throw new RetriesExhaustedException(last);

代码示例来源:origin: pravega/pravega

private void deleteCandidates(List<Long> deletionCandidates, Collection<Long> referencedLedgerIds, Context context) {
  for (long ledgerId : deletionCandidates) {
    if (referencedLedgerIds.contains(ledgerId)) {
      output("Not deleting Ledger %d because is is now referenced.", ledgerId);
    try {
      Exceptions.handleInterrupted(() -> context.logFactory.getBookKeeperClient().deleteLedger(ledgerId));
      output("Deleted Ledger %d.", ledgerId);
    } catch (Exception ex) {
      output("FAILED to delete Ledger %d: %s.", ledgerId, ex.getMessage());

代码示例来源:origin: pravega/pravega

private Set<PravegaNodeUri> fetchFromServers(ControllerImpl client, int numServers) {
  Set<PravegaNodeUri> uris = new HashSet<>();
  // Reading multiple times to ensure round robin policy gets a chance to read from all available servers.
  // Reading more than the number of servers since on failover request might fail intermittently due to
  // client-server connection timing issues.
  while (uris.size() < numServers) {
    try {
    } catch (Exception e) {
      // Ignore temporary exceptions which happens due to failover.
    // Adding a small delay to avoid busy cpu loop.
    Exceptions.handleInterrupted(() -> Thread.sleep(10));
  return uris;

代码示例来源:origin: pravega/pravega

 * Verifies that a Segment does not exist in Storage, with a reasonable delay. This can be used to verify that
 * a delayed delete is actually processed by the Storage Writer.
private void checkNotExistsInStorage(String segmentName, TestContext context) {
  int attemptsLeft = 100;
  final long delay = DEFAULT_WRITER_CONFIG.getFlushThresholdTime().toMillis();
  while (attemptsLeft >= 0 &&, TIMEOUT).join()) {
    Exceptions.handleInterrupted(() -> Thread.sleep(delay));
  Assert.assertTrue("Segment '" + segmentName + "' still exists in Storage.", attemptsLeft >= 0);

代码示例来源:origin: pravega/pravega

while (!stopFlag.get()) {
  try {
    Exceptions.handleInterrupted(() -> Thread.sleep(100));

代码示例来源:origin: pravega/pravega

public void flush() throws SegmentSealedException {
  //flushLatch is used to simulate a blocking Flush(). .
  Exceptions.handleInterrupted(() -> flushLatch.await());
  throw new SegmentSealedException(segment.toString());

代码示例来源:origin: pravega/pravega

void waitForScaling(String scope, String stream, StreamConfiguration initialConfig) {
    int initialMaxSegmentNumber = initialConfig.getScalingPolicy().getMinNumSegments() - 1;
    boolean scaled = false;
    for (int waitCounter = 0; waitCounter < SCALE_WAIT_ITERATIONS; waitCounter++) {
      StreamSegments streamSegments = controller.getCurrentSegments(scope, stream).join();
      if (streamSegments.getSegments().stream().mapToLong(Segment::getSegmentId).max().orElse(-1) > initialMaxSegmentNumber) {
        scaled = true;
      //Scaling operation did not happen, wait
      Exceptions.handleInterrupted(() -> Thread.sleep(10000));

    assertTrue("Scaling did not happen within desired time", scaled);

代码示例来源:origin: pravega/pravega

private static <T> int readEvents(EventStreamReader<T> reader, int limit) {
  final int timeout = 1000;
  final int interReadWait = 50;
  EventRead<T> event;
  int validEvents = 0;
  try {
    do {
      event = reader.readNextEvent(timeout);
      Exceptions.handleInterrupted(() -> Thread.sleep(interReadWait));
      if (event.getEvent() != null) {
    } while ((event.getEvent() != null || event.isCheckpoint()) && validEvents < limit);
  } catch (TruncatedDataException e) {
    throw new TruncatedDataException(e.getCause());
  } catch (RuntimeException e) {
    if (e.getCause() instanceof RetriesExhaustedException) {
      throw new RetriesExhaustedException(e.getCause());
    } else {
      throw e;
  return validEvents;

代码示例来源:origin: pravega/pravega

private Map<Stream, StreamCut> generateStreamCuts(final ReaderGroup readerGroup) {"Generate StreamCuts");
  String readerId = "streamCut";
  CompletableFuture<Map<, StreamCut>> streamCuts = null;
  try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(SCOPE_2, ClientConfig.builder().controllerURI(controllerURI).build());
     EventStreamReader<Integer> reader = clientFactory.createReader(readerId, READER_GROUP_NAME,
                                    new JavaSerializer<Integer>(), readerConfig)) {
    streamCuts = readerGroup.generateStreamCuts(executor); //create checkpoint
    Exceptions.handleInterrupted(() -> TimeUnit.MILLISECONDS.sleep(GROUP_REFRESH_TIME_MILLIS)); // sleep for group refresh.
    //read the next event, this causes the reader to update its latest offset.
    EventRead<Integer> event = reader.readNextEvent(READ_TIMEOUT);
    assertTrue("No events expected as all events are read", (event.getEvent() == null) && (!event.isCheckpoint()));
    Futures.exceptionListener(streamCuts, t -> log.error("StreamCut generation failed", t));
    assertTrue("Stream cut generation should be completed", Futures.await(streamCuts));
  } catch (ReinitializationRequiredException e) {
    log.error("Exception while reading event using readerId: {}", readerId, e);
    fail("Reinitialization Exception is not expected");
  return streamCuts.join();

代码示例来源:origin: pravega/pravega

final Consumer<Segment> segmentSealedCallback = segment ->  Exceptions.handleInterrupted(() -> latch.await());
