本文整理了Java中com.twitter.util.Future.ensure()
方法的一些代码示例,展示了Future.ensure()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Future.ensure()
方法的具体详情如下:
包路径:com.twitter.util.Future
类名称:Future
方法名:ensure
暂无
代码示例来源:origin: twitter/distributedlog
@Override
public void onFailure(final Throwable cause) {
writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
FutureUtils.setException(createPromise, cause);
return BoxedUnit.UNIT;
}
});
}
});
代码示例来源:origin: twitter/distributedlog
private void complete(final Throwable cause) {
closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
if (null != cause && shouldThrow) {
FutureUtils.setException(completePromise, cause);
} else {
FutureUtils.setValue(completePromise, null);
}
return BoxedUnit.UNIT;
}
});
}
});
代码示例来源:origin: twitter/distributedlog
protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
final BKLogReadHandler readHandler = createReadHandler();
return readHandler.asyncGetFullLedgerList(true, false).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
readHandler.asyncClose();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
synchronized void unlockInternalLock(final Promise<Void> closePromise) {
if (internalLock == null) {
FutureUtils.setValue(closePromise, null);
} else {
internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
FutureUtils.setValue(closePromise, null);
return BoxedUnit.UNIT;
}
});
}
}
代码示例来源:origin: twitter/distributedlog
public synchronized Future<T> apply(final Function0<T> fn) {
Preconditions.checkNotNull(fn);
if (closed) {
return Future.exception(new RejectedExecutionException("Operation submitted to closed SafeQueueingFuturePool"));
}
++outstanding;
queue.add(fn);
Future<T> result = orderedFuturePool.apply(new Function0<T>() {
@Override
public T apply() {
return queue.poll().apply();
}
@Override
public String toString() {
return fn.toString();
}
}).ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
if (decrOutstandingAndCheckDone()) {
applyAll();
}
return null;
}
});
return result;
}
代码示例来源:origin: twitter/distributedlog
void deleteLedger(final long ledgerId) {
final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
synchronized (ledgerDeletions) {
ledgerDeletions.add(deleteFuture);
}
deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable cause) {
LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ",
new Object[] { ledgerId, allocatePath, cause });
if (!isClosing()) {
deleteLedger(ledgerId);
}
return BoxedUnit.UNIT;
}
}).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
synchronized (ledgerDeletions) {
ledgerDeletions.remove(deleteFuture);
}
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
void scheduleTimeout(final StreamOp op) {
final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!timeout.isCancelled()) {
serviceTimeout.inc();
handleServiceTimeout("Operation " + op.getClass().getName() + " timeout");
}
}
}, serviceTimeoutMs, TimeUnit.MILLISECONDS);
op.responseHeader().ensure(new Function0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
timeout.cancel();
return null;
}
});
}
代码示例来源:origin: twitter/distributedlog
@Override
public <T> Future<T> apply(Function0<T> function0) {
if (traceTaskExecution) {
taskPendingCounter.inc();
Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted();
Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0));
taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS));
futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
taskPendingCounter.dec();
return null;
}
});
return futureResult;
} else {
return futurePool.apply(function0);
}
}
}
代码示例来源:origin: twitter/distributedlog
@Override
public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
bulkWritePendingStat.inc();
receivedRecordCounter.add(data.size());
BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx),
featureChecksumDisabled, accessControlManager);
executeStreamOp(op);
return op.result().ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
bulkWritePendingStat.dec();
return null;
}
});
}
代码示例来源:origin: twitter/distributedlog
private Future<WriteResponse> doWrite(final String name,
ByteBuffer data,
Long checksum,
boolean isRecordSet) {
writePendingStat.inc();
receivedRecordCounter.inc();
WriteOp op = newWriteOp(name, data, checksum, isRecordSet);
executeStreamOp(op);
return op.result().ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
writePendingStat.dec();
return null;
}
});
}
代码示例来源:origin: twitter/distributedlog
private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
return ReadUtils.asyncReadFirstUserRecord(
getFullyQualifiedName(),
ledger,
firstNumEntriesPerReadLastRecordScan,
maxNumEntriesPerReadLastRecordScan,
new AtomicInteger(0),
scheduler,
handleCache,
beginDLSN
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
代码示例来源:origin: twitter/distributedlog
private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
final List<LogSegmentMetadata> segments) {
if (segments.isEmpty()) {
return getLastDLSNAsync();
}
final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
if (segmentIdx < 0) {
return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
}
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(readerBKC).conf(conf).build();
return getDLSNNotLessThanTxIdInSegment(
fromTxnId,
segmentIdx,
segments,
handleCache
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments();
final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
.bkc(bkdlm.getWriterBKC())
.conf(conf)
.build();
return ReadUtils.asyncReadFirstUserRecord(
bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
handleCache, dlsn
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
BKLogReadHandler readHandler = bkdlm.createReadHandler();
List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
.bkc(bkdlm.getWriterBKC())
.conf(conf)
.build();
return ReadUtils.asyncReadLastRecord(
bkdlm.getStreamName(), ledgerList.get(ledgerNo), false, false, false, 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
handleCache
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
}).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
代码示例来源:origin: twitter/distributedlog
private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception {
List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments();
final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
.bkc(bkdlm.getWriterBKC())
.conf(conf)
.build();
return ReadUtils.getLogRecordNotLessThanTxId(
bkdlm.getStreamName(),
logSegments.get(logsegmentIdx),
transactionId,
Executors.newSingleThreadExecutor(),
handleCache,
10
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
}).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
代码示例来源:origin: twitter/distributedlog
return future.ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
pendingWrites.dec();
代码示例来源:origin: traneio/future
@Benchmark
public Void ensureConstN() throws Exception {
Future<Void> f = constVoidFuture;
for (int i = 0; i < N.n; i++)
f = f.ensure(ensureF);
return Await.result(f);
}
内容来源于网络,如有侵权,请联系作者删除!