com.twitter.util.Future.collect()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中com.twitter.util.Future.collect()方法的一些代码示例,展示了Future.collect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Future.collect()方法的具体详情如下:
包路径:com.twitter.util.Future
类名称:Future
方法名:collect

Future.collect介绍

暂无

代码示例

代码示例来源:origin: twitter/distributedlog

private Future<List<Set<String>>> retrieveLogs() {
  Collection<SubNamespace> subNss = subNamespaces.values();
  List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
  for (SubNamespace subNs : subNss) {
    logsList.add(subNs.getLogs());
  }
  return Future.collect(logsList);
}

代码示例来源:origin: twitter/distributedlog

private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
  if (streamsToClose.isEmpty()) {
    logger.info("No streams to close.");
    List<Void> emptyList = new ArrayList<Void>();
    return Future.value(emptyList);
  }
  List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
  for (Stream stream : streamsToClose) {
    if (rateLimiter.isPresent()) {
      rateLimiter.get().acquire();
    }
    futures.add(stream.requestClose("Close Streams"));
  }
  return Future.collect(futures);
}

代码示例来源:origin: twitter/distributedlog

@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
  Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
  List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
  for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
    futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
  }
  return Future.collect(futures).map(new Function<List<Void>, Void>() {
    @Override
    public Void apply(List<Void> list) {
      return null;
    }
  });
}

代码示例来源:origin: twitter/distributedlog

static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
                               String logRootPath,
                               boolean ownAllocator) {
  // Note re. persistent lock state initialization: the read lock persistent state (path) is
  // initialized here but only used in the read handler. The reason is its more convenient and
  // less error prone to manage all stream structure in one place.
  final String logRootParentPath = new File(logRootPath).getParent();
  final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
  final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
  final String lockPath = logRootPath + LOCK_PATH;
  final String readLockPath = logRootPath + READ_LOCK_PATH;
  final String versionPath = logRootPath + VERSION_PATH;
  final String allocationPath = logRootPath + ALLOCATION_PATH;
  int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
  List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
  checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
  checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
  checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
  checkFutures.add(Utils.zkGetData(zk, versionPath, false));
  checkFutures.add(Utils.zkGetData(zk, lockPath, false));
  checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
  checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
  if (ownAllocator) {
    checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
  }
  return Future.collect(checkFutures);
}

代码示例来源:origin: twitter/distributedlog

private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
                  List<String> subscribers) {
  List<Future<Pair<String, DLSN>>> futures =
      new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
  for (String s : subscribers) {
    final String subscriber = s;
    Future<Pair<String, DLSN>> future =
      // Get the last commit position from zookeeper
      getSubscriber(subscriber).getLastCommitPositionFromZK().map(
          new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
            @Override
            public Pair<String, DLSN> apply(DLSN dlsn) {
              return Pair.of(subscriber, dlsn);
            }
          });
    futures.add(future);
  }
  Future.collect(futures).foreach(
    new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
      @Override
      public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
        Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
        for (Pair<String, DLSN> pair : subscriptions) {
          subscriptionMap.put(pair.getLeft(), pair.getRight());
        }
        result.setValue(subscriptionMap);
        return BoxedUnit.UNIT;
      }
    });
}

代码示例来源:origin: twitter/distributedlog

fetchFutures.add(fetchLogLocation(uri, logName));
Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
  @Override
  public void onSuccess(List<Optional<URI>> fetchResults) {

代码示例来源:origin: twitter/distributedlog

uriList.add(uri);
Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
  @Override
  public void onSuccess(List<Set<String>> resultList) {

代码示例来源:origin: twitter/distributedlog

segmentCandidates = Await.result(Future.collect(futures));
} catch (Exception e) {
  throw new IOException("Failed on checking stream " + streamName, e);

代码示例来源:origin: twitter/distributedlog

Future.collect(searchResults).addEventListener(
    FutureEventListenerRunnable.of(processSearchResultsListener, executorService));

代码示例来源:origin: twitter/distributedlog

FutureUtils.result(Future.collect(writeFutures));
client.close();

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testGetLogSegmentNames() throws Exception {
  Transaction<Object> createTxn = lsmStore.transaction();
  List<LogSegmentMetadata> createdSegments = Lists.newArrayListWithExpectedSize(10);
  for (int i = 0; i < 10; i++) {
    LogSegmentMetadata segment = createLogSegment(i);
    createdSegments.add(segment);
    lsmStore.createLogSegment(createTxn, segment);
  }
  FutureUtils.result(createTxn.execute());
  String rootPath = "/" + runtime.getMethodName();
  List<String> children = zkc.get().getChildren(rootPath, false);
  Collections.sort(children);
  assertEquals("Should find 10 log segments",
      10, children.size());
  List<String> logSegmentNames = FutureUtils.result(lsmStore.getLogSegmentNames(rootPath));
  Collections.sort(logSegmentNames);
  assertEquals("Should find 10 log segments",
      10, logSegmentNames.size());
  assertEquals(children, logSegmentNames);
  List<Future<LogSegmentMetadata>> getFutures = Lists.newArrayListWithExpectedSize(10);
  for (int i = 0; i < 10; i++) {
    getFutures.add(lsmStore.getLogSegment(rootPath + "/" + logSegmentNames.get(i)));
  }
  List<LogSegmentMetadata> segments =
      FutureUtils.result(Future.collect(getFutures));
  for (int i = 0; i < 10; i++) {
    assertEquals(createdSegments.get(i), segments.get(i));
  }
}

代码示例来源:origin: twitter/distributedlog

List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
for (int i = 0; i < 5; i++) {
  Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
  Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetPromiseList));
for (int i = 0; i < 5; i++) {
  Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));

代码示例来源:origin: twitter/distributedlog

List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All 11 records should be written",
    numRecords + 1, dlsns.size());

代码示例来源:origin: twitter/distributedlog

assertEquals("Position should still be " + numRecords,
    10, writer.getPositionWithinLogSegment());
List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All records should be written",
    numRecords, dlsns.size());

代码示例来源:origin: twitter/distributedlog

List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All first 10 records should be written",
    numRecords, dlsns.size());

代码示例来源:origin: twitter/distributedlog

List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures));
  Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i));
List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures));
for (int i = 0; i < 5; i++) {
  Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i));

代码示例来源:origin: twitter/distributedlog

List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
  Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));

代码示例来源:origin: org.apache.distributedlog/distributedlog-service

private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
  if (streamsToClose.isEmpty()) {
    logger.info("No streams to close.");
    List<Void> emptyList = new ArrayList<Void>();
    return Future.value(emptyList);
  }
  List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
  for (Stream stream : streamsToClose) {
    if (rateLimiter.isPresent()) {
      rateLimiter.get().acquire();
    }
    futures.add(stream.requestClose("Close Streams"));
  }
  return Future.collect(futures);
}

代码示例来源:origin: com.twitter/distributedlog-client

@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
  Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
  List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
  for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
    futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
  }
  return Future.collect(futures).map(new Function<List<Void>, Void>() {
    @Override
    public Void apply(List<Void> list) {
      return null;
    }
  });
}

代码示例来源:origin: org.apache.distributedlog/distributedlog-client

@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
  Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
  List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
  for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
    futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
  }
  return Future.collect(futures).map(new Function<List<Void>, Void>() {
    @Override
    public Void apply(List<Void> list) {
      return null;
    }
  });
}

相关文章