本文整理了Java中java.util.concurrent.LinkedBlockingDeque.size()
方法的一些代码示例,展示了LinkedBlockingDeque.size()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingDeque.size()
方法的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingDeque
类名称:LinkedBlockingDeque
方法名:size
[英]Returns the number of elements in this deque.
[中]返回此数据块中的元素数。
代码示例来源:origin: alibaba/jstorm
public int size() {
synchronized (_lock) {
int size = 0;
for (LinkedBlockingDeque<K> bucket : _buckets) {
size += bucket.size();
}
return size;
}
}
代码示例来源:origin: apache/hbase
@Override
public int size() {
return queue.size();
}
代码示例来源:origin: lealone/Lealone
public int size() {
return deque.size();
}
代码示例来源:origin: loklak/loklak_server
public static int getMessageQueueSize() {
return messageQueue.size();
}
代码示例来源:origin: h2oai/h2o-2
private static void bbstats( AtomicInteger ai ) {
if( !DEBUG ) return;
if( (ai.incrementAndGet()&511)==511 ) {
Log.warn("BB make="+BBMAKE.get()+" free="+BBFREE.get()+" cache="+BBCACHE.get()+" size="+BBS.size());
}
}
代码示例来源:origin: loklak/loklak_server
private void dumpMessageBulk(List<DAO.MessageWrapper> bulk, AtomicInteger newMessageCounter, AtomicInteger doubleMessageCounter) {
long dumpstart = System.currentTimeMillis();
int newWritten = DAO.writeMessageBulk(bulk).size();
doubleMessageCounter.addAndGet(bulk.size() - newWritten);
newMessageCounter.addAndGet(newWritten);
long dumpfinish = System.currentTimeMillis();
DAO.log("dumped timelines: " + newMessageCounter + " new, " + doubleMessageCounter + " known from cache, storage time: " + (dumpfinish - dumpstart) + " ms, remaining messages: " + messageQueue.size());
newMessageCounter.set(0);
doubleMessageCounter.set(0);
}
代码示例来源:origin: prestodb/presto
public ExchangeClientStatus getStatus()
{
// The stats created by this method is only for diagnostics.
// It does not guarantee a consistent view between different exchange clients.
// Guaranteeing a consistent view introduces significant lock contention.
ImmutableList.Builder<PageBufferClientStatus> pageBufferClientStatusBuilder = ImmutableList.builder();
for (HttpPageBufferClient client : allClients.values()) {
pageBufferClientStatusBuilder.add(client.getStatus());
}
List<PageBufferClientStatus> pageBufferClientStatus = pageBufferClientStatusBuilder.build();
synchronized (this) {
int bufferedPages = pageBuffer.size();
if (bufferedPages > 0 && pageBuffer.peekLast() == NO_MORE_PAGES) {
bufferedPages--;
}
return new ExchangeClientStatus(bufferRetainedSizeInBytes, maxBufferRetainedSizeInBytes, averageBytesPerRequest, successfulRequests, bufferedPages, noMoreLocations, pageBufferClientStatus);
}
}
代码示例来源:origin: apache/zookeeper
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
sb
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
.append(" local:").append(local)
.append(" remoteserver:").append(remote)
.append(" lastZxid:").append(lastZxid)
.append(" xid:").append(xid)
.append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
.append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
.append(" queuedpkts:").append(outgoingQueue.size())
.append(" pendingresp:").append(pendingQueue.size())
.append(" queuedevents:").append(eventThread.waitingEvents.size());
return sb.toString();
}
代码示例来源:origin: alibaba/nacos
private void addResult(HealthCheckResult result) {
if (!Switch.getIncrementalList().contains(result.getDom())) {
return;
}
if (!healthCheckResults.offer(result)) {
Loggers.EVT_LOG.warn("[HEALTH-CHECK-SYNC] failed to add check result to queue, queue size: {}", healthCheckResults.size());
}
}
代码示例来源:origin: apache/flume
@Override
public synchronized void start() {
channelCounter.start();
channelCounter.setChannelSize(queue.size());
channelCounter.setChannelCapacity(Long.valueOf(
queue.size() + queue.remainingCapacity()));
super.start();
}
代码示例来源:origin: apache/flume
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
代码示例来源:origin: apache/flume
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
代码示例来源:origin: apache/flume
private void resizeQueue(int capacity) throws InterruptedException {
int oldCapacity;
synchronized (queueLock) {
oldCapacity = queue.size() + queue.remainingCapacity();
}
if (oldCapacity == capacity) {
return;
} else if (oldCapacity > capacity) {
if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
} else {
synchronized (queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
}
} else {
synchronized (queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
queueRemaining.release(capacity - oldCapacity);
}
}
代码示例来源:origin: apache/flume
@Override
public synchronized void stop() {
channelCounter.setChannelSize(queue.size());
channelCounter.stop();
super.stop();
}
代码示例来源:origin: apache/hbase
/**
* Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
* skip all calls which it thinks should be dropped.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
@Override
public CallRunner take() throws InterruptedException {
CallRunner cr;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
numLifoModeSwitches.increment();
cr = queue.takeLast();
} else {
cr = queue.takeFirst();
}
if (needToDrop(cr)) {
numGeneralCallsDropped.increment();
cr.drop();
} else {
return cr;
}
}
}
代码示例来源:origin: apache/hbase
@Override
public CallRunner poll() {
CallRunner cr;
boolean switched = false;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
// Only count once per switch.
if (!switched) {
switched = true;
numLifoModeSwitches.increment();
}
cr = queue.pollLast();
} else {
switched = false;
cr = queue.pollFirst();
}
if (cr == null) {
return cr;
}
if (needToDrop(cr)) {
numGeneralCallsDropped.increment();
cr.drop();
} else {
return cr;
}
}
}
代码示例来源:origin: apache/flume
@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized (queueLock) {
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event);
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;
return event;
}
代码示例来源:origin: apache/drill
void close() {
logger.debug( "[#{}] Query listener closing.", instanceId );
closed = true;
if ( stopThrottlingIfSo() ) {
logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
instanceId, batchQueue.size() );
}
while (!batchQueue.isEmpty()) {
// Don't bother with query timeout, we're closing the cursor
QueryDataBatch qdb = batchQueue.poll();
if (qdb != null && qdb.getData() != null) {
qdb.getData().release();
}
}
// Close may be called before the first result is received and therefore
// when the main thread is blocked waiting for the result. In that case
// we want to unblock the main thread.
firstMessageReceived.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
completed = true;
}
代码示例来源:origin: jersey/jersey
@Override
public void close() throws IOException {
if (queue.size() == CAPACITY) {
boolean offer = false;
try {
offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore.
}
if (!offer) {
queue.removeLast();
queue.add(VOID);
}
} else {
queue.add(VOID);
}
ctx.flush();
}
代码示例来源:origin: jankotek/mapdb
/**
* Returns a new deque of given size containing consecutive
* Integers 0 ... n - 1.
*/
private static LinkedBlockingDeque<Integer> populatedDeque(int n) {
LinkedBlockingDeque<Integer> q =
new LinkedBlockingDeque<Integer>(n);
assertTrue(q.isEmpty());
for (int i = 0; i < n; i++)
assertTrue(q.offer(new Integer(i)));
assertFalse(q.isEmpty());
assertEquals(0, q.remainingCapacity());
assertEquals(n, q.size());
assertEquals((Integer) 0, q.peekFirst());
assertEquals((Integer) (n - 1), q.peekLast());
return q;
}
内容来源于网络,如有侵权,请联系作者删除!