eu.toolchain.async.AsyncFuture类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(89)

本文整理了Java中eu.toolchain.async.AsyncFuture类的一些代码示例,展示了AsyncFuture类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AsyncFuture类的具体详情如下:
包路径:eu.toolchain.async.AsyncFuture
类名称:AsyncFuture

AsyncFuture介绍

[英]An interface that defines a contract with a computation that could be asynchronous.

Thread Safety

All public methods exposed in AsyncFuture are fully thread-safe, guaranteeing that interactions with the future atomic.

States

A future has four states.

  • running, which indicates that the future is currently active, and has not reached an end-state.
  • resolved, which indicates that the computation was successful, and produced a result.
  • failed, which indicates that the computation failed through an exception, which can be fetched for inspection.
  • cancelled, which indicates that the computation was cancelled.

The last three states are characterized as end states, a future can only transition into one of these, and when in an end-state will never go into another state. If a future is in and end state it is considered done , as is indicated by the #isDone() method.
[中]一种接口,用于定义具有异步计算的协定。
线程安全
所有在AsyncFuture中公开的公共方法都是完全线程安全的,保证了与未来原子的交互。

未来有四种状态。
*正在运行,表示未来当前处于活动状态,尚未达到结束状态。
*已解决,表示计算成功,并产生结果。
*failed,表示计算因异常而失败,可以提取异常进行检查。
*已取消,表示计算已取消。
最后三种状态被描述为“结束状态”,未来只能过渡到其中一种状态,当处于结束状态时,将永远不会进入另一种状态。如#isDone()方法所示,如果未来处于结束状态,它将被视为“完成”。

代码示例

代码示例来源:origin: com.spotify.ffwd/ffwd-api

@Override
public AsyncFuture<Void> start() {
  return servers
    .bind(log, protocol, server, retry)
    .transform(new Transform<ProtocolConnection, Void>() {
      @Override
      public Void transform(ProtocolConnection c) throws Exception {
        if (!connection.compareAndSet(null, c)) {
          c.stop();
        }
        return null;
      }
    });
}

代码示例来源:origin: eu.toolchain.async/tiny-async-core

@Override
public AsyncFuture<T> bind(AsyncFuture<?> other) {
  other.cancel();
  return this;
}

代码示例来源:origin: spotify/heroic

return result.getReturnValue().catchFailed(throwable -> {
    reporter.reportClusterNodeRpcError();
  }).catchCancelled(ignore -> {
    reporter.reportClusterNodeRpcCancellation();
  });
}, iteratorPolicy)
.directTransform(retryResult -> handleRetryTraceFn.apply(retryResult.getResult(),
  queryTracesFromRetries(retryResult.getErrors(), retryResult.getBackoffTimings())));

代码示例来源:origin: spotify/ffwd

private <T> Observable<T> toObservable(final AsyncFuture<T> future) {
  return Observable.create(observer -> {
    future.onDone(new FutureDone<T>() {
      @Override
      public void failed(final Throwable throwable) throws Exception {
        observer.onError(throwable);
      }
      @Override
      public void resolved(final T v) throws Exception {
        observer.onNext(v);
        observer.onCompleted();
      }
      @Override
      public void cancelled() throws Exception {
        observer.onCompleted();
      }
    });
  });
}

代码示例来源:origin: com.spotify.ffwd/ffwd-api

@Override
public AsyncFuture<Void> start() {
  return clients
    .connect(log, protocol, client, retry)
    .lazyTransform(new LazyTransform<ProtocolConnection, Void>() {
      @Override
      public AsyncFuture<Void> transform(ProtocolConnection result) throws Exception {
        if (!connection.compareAndSet(null, result)) {
          return result.stop();
        }
        return async.resolved(null);
      }
    });
}

代码示例来源:origin: eu.toolchain.async/tiny-async-core

@Override
  public AsyncFuture<Void> transform(Void result) throws Exception {
    while (true) {
      final Managed<T> old = current.get();
      // we are stopping
      // block old from successfully stopping until this one has been cleaned up.
      if (old == null) {
        return next.stop().onFinished(b.releasing());
      }
      if (!current.compareAndSet(old, next)) {
        continue;
      }
      // swap successful, now we can now safely release old.
      b.release();
      // stopping old.
      return old.stop();
    }
  }
});

代码示例来源:origin: spotify/ffwd

private void stop(final Injector primary) throws Exception {
  final InputManager input = primary.getInstance(InputManager.class);
  final OutputManager output = primary.getInstance(OutputManager.class);
  final DebugServer debug = primary.getInstance(DebugServer.class);
  final AsyncFramework async = primary.getInstance(AsyncFramework.class);
  final ArrayList<AsyncFuture<Void>> shutdown = Lists.newArrayList();
  log.info("Waiting for all components to stop...");
  shutdown.add(input.stop());
  shutdown.add(output.stop());
  shutdown.add(debug.stop());
  AsyncFuture<Void> all = async.collectAndDiscard(shutdown);
  try {
    all.get(10, TimeUnit.SECONDS);
  } catch (final Exception e) {
    log.error("All components did not stop in a timely fashion", e);
    all.cancel();
  }
}

代码示例来源:origin: eu.toolchain.async/tiny-async-core

return constructor.directTransform(new Transform<T, Void>() {
  @Override
  public Void transform(T result) throws Exception {
    return null;
}).onDone(new FutureDone<Void>() {
  @Override
  public void failed(Throwable cause) throws Exception {

代码示例来源:origin: com.spotify.ffwd/ffwd-api

if (!flush.isDone()) {
  synchronized (pendingLock) {
    log.debug("Adding pending flush (size: {})", pending.size());
  flush.on(new FutureFinished() {
    @Override
    public void finished() throws Exception {
flush.on(new FutureFinished() {
  @Override
  public void finished() throws Exception {

代码示例来源:origin: spotify/ffwd

@Override
  public boolean isAlive(final Server server) {
    try {
      clientFactory.newClient(server).ping().get();
    } catch (Exception e) {
      log.warn("Error when pinging server ({}): {}", server, e.getMessage());
      log.trace("Error when pinging server ({}): {}", server, e.getMessage(), e);
      return false;
    }

    return true;
  }
}

代码示例来源:origin: spotify/ffwd

future.on(new FutureFailed() {
  @Override
  public void failed(Throwable throwable) throws Exception {

代码示例来源:origin: eu.toolchain.async/tiny-async-core

private void setupNext(final Callable<? extends AsyncFuture<? extends S>> next) {
  final AsyncFuture<? extends S> f;
  pending.incrementAndGet();
  try {
    f = next.call();
  } catch (final Exception e) {
    failed(e);
    return;
  }
  f.onDone(this);
}

代码示例来源:origin: spotify/ffwd

@Override
public AsyncFuture<Void> start() {
  return clients
    .connect(log, protocol, client, retry)
    .lazyTransform(new LazyTransform<ProtocolConnection, Void>() {
      @Override
      public AsyncFuture<Void> transform(ProtocolConnection result) throws Exception {
        if (!connection.compareAndSet(null, result)) {
          return result.stop();
        }
        return async.resolved(null);
      }
    });
}

代码示例来源:origin: com.spotify.ffwd/ffwd-api

futures.add(sink
    .sendEvents(batch.events)
    .onFinished(() -> batchingStatistics.reportSentEvents(batch.events.size())));
  futures.add(sink
    .sendMetrics(batch.metrics)
    .onFinished(() -> batchingStatistics.reportSentMetrics(batch.metrics.size())));
  futures.add(sink
    .sendBatches(batch.batches)
    .onFinished(() -> batchingStatistics.reportSentBatches(batch.batches.size(),
      batch.size())));
return async.collectAndDiscard(futures).onFinished(writeMonitor).onFinished(() -> {
  batchingStatistics.reportQueueSizeDec(batch.size());
  batchingStatistics.reportInternalBatchWrite(batch.size());

代码示例来源:origin: spotify/ffwd

if (!flush.isDone()) {
  synchronized (pendingLock) {
    log.debug("Adding pending flush (size: {})", pending.size());
  flush.on(new FutureFinished() {
    @Override
    public void finished() throws Exception {
flush.on(new FutureFinished() {
  @Override
  public void finished() throws Exception {

代码示例来源:origin: spotify/ffwd

private void start(final Injector primary) throws Exception {
  final InputManager input = primary.getInstance(InputManager.class);
  final OutputManager output = primary.getInstance(OutputManager.class);
  final DebugServer debug = primary.getInstance(DebugServer.class);
  final AsyncFramework async = primary.getInstance(AsyncFramework.class);
  final ArrayList<AsyncFuture<Void>> startup = Lists.newArrayList();
  log.info("Waiting for all components to start...");
  startup.add(output.start());
  startup.add(input.start());
  startup.add(debug.start());
  async.collectAndDiscard(startup).get(10, TimeUnit.SECONDS);
  input.init();
  output.init();
}

代码示例来源:origin: spotify/ffwd

@Override
public AsyncFuture<Void> start() {
  return servers
    .bind(log, protocol, server, retry)
    .transform(new Transform<ProtocolConnection, Void>() {
      @Override
      public Void transform(ProtocolConnection c) throws Exception {
        if (!connection.compareAndSet(null, c)) {
          c.stop();
        }
        return null;
      }
    });
}

代码示例来源:origin: eu.toolchain.async/tiny-async-core

protected <C> AsyncFuture<Void> doCollectAndDiscard(
  Collection<? extends AsyncFuture<C>> futures
) {
  final ResolvableFuture<Void> target = future();
  final FutureDone<C> done = new CollectAndDiscardHelper<>(futures.size(), target);
  for (final AsyncFuture<C> q : futures) {
    q.onDone(done);
  }
  bindSignals(target, futures);
  return target;
}

代码示例来源:origin: spotify/ffwd

}).lazyTransform(new LazyTransform<Void, Void>() {
}).lazyTransform(new LazyTransform<Void, Void>() {

代码示例来源:origin: spotify/ffwd

futures.add(sink
    .sendEvents(batch.events)
    .onFinished(() -> batchingStatistics.reportSentEvents(batch.events.size())));
  futures.add(sink
    .sendMetrics(batch.metrics)
    .onFinished(() -> batchingStatistics.reportSentMetrics(batch.metrics.size())));
  futures.add(sink
    .sendBatches(batch.batches)
    .onFinished(() -> batchingStatistics.reportSentBatches(batch.batches.size(),
      batch.size())));
return async.collectAndDiscard(futures).onFinished(writeMonitor).onFinished(() -> {
  batchingStatistics.reportQueueSizeDec(batch.size());
  batchingStatistics.reportInternalBatchWrite(batch.size());

相关文章