java.util.concurrent.LinkedBlockingDeque.size()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(196)

本文整理了Java中java.util.concurrent.LinkedBlockingDeque.size()方法的一些代码示例,展示了LinkedBlockingDeque.size()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingDeque.size()方法的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingDeque
类名称:LinkedBlockingDeque
方法名:size

LinkedBlockingDeque.size介绍

[英]Returns the number of elements in this deque.
[中]返回此数据块中的元素数。

代码示例

代码示例来源:origin: alibaba/jstorm

public int size() {
  synchronized (_lock) {
    int size = 0;
    for (LinkedBlockingDeque<K> bucket : _buckets) {
      size += bucket.size();
    }
    return size;
  }
}

代码示例来源:origin: apache/hbase

@Override
public int size() {
 return queue.size();
}

代码示例来源:origin: lealone/Lealone

public int size() {
  return deque.size();
}

代码示例来源:origin: loklak/loklak_server

public static int getMessageQueueSize() {
  return messageQueue.size();
}

代码示例来源:origin: h2oai/h2o-2

private static void bbstats( AtomicInteger ai ) {
 if( !DEBUG ) return;
 if( (ai.incrementAndGet()&511)==511 ) {
  Log.warn("BB make="+BBMAKE.get()+" free="+BBFREE.get()+" cache="+BBCACHE.get()+" size="+BBS.size());
 }
}

代码示例来源:origin: loklak/loklak_server

private void dumpMessageBulk(List<DAO.MessageWrapper> bulk, AtomicInteger newMessageCounter, AtomicInteger doubleMessageCounter) {
  long dumpstart = System.currentTimeMillis();
  int newWritten = DAO.writeMessageBulk(bulk).size();
  doubleMessageCounter.addAndGet(bulk.size() - newWritten);
  newMessageCounter.addAndGet(newWritten);
  long dumpfinish = System.currentTimeMillis();
  DAO.log("dumped timelines: " + newMessageCounter + " new, " + doubleMessageCounter + " known from cache, storage time: " + (dumpfinish - dumpstart) + " ms, remaining messages: " + messageQueue.size());
  newMessageCounter.set(0);
  doubleMessageCounter.set(0);
}

代码示例来源:origin: prestodb/presto

public ExchangeClientStatus getStatus()
{
  // The stats created by this method is only for diagnostics.
  // It does not guarantee a consistent view between different exchange clients.
  // Guaranteeing a consistent view introduces significant lock contention.
  ImmutableList.Builder<PageBufferClientStatus> pageBufferClientStatusBuilder = ImmutableList.builder();
  for (HttpPageBufferClient client : allClients.values()) {
    pageBufferClientStatusBuilder.add(client.getStatus());
  }
  List<PageBufferClientStatus> pageBufferClientStatus = pageBufferClientStatusBuilder.build();
  synchronized (this) {
    int bufferedPages = pageBuffer.size();
    if (bufferedPages > 0 && pageBuffer.peekLast() == NO_MORE_PAGES) {
      bufferedPages--;
    }
    return new ExchangeClientStatus(bufferRetainedSizeInBytes, maxBufferRetainedSizeInBytes, averageBytesPerRequest, successfulRequests, bufferedPages, noMoreLocations, pageBufferClientStatus);
  }
}

代码示例来源:origin: apache/zookeeper

@Override
public String toString() {
  StringBuilder sb = new StringBuilder();
  SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
  SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
  sb
    .append("sessionid:0x").append(Long.toHexString(getSessionId()))
    .append(" local:").append(local)
    .append(" remoteserver:").append(remote)
    .append(" lastZxid:").append(lastZxid)
    .append(" xid:").append(xid)
    .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
    .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
    .append(" queuedpkts:").append(outgoingQueue.size())
    .append(" pendingresp:").append(pendingQueue.size())
    .append(" queuedevents:").append(eventThread.waitingEvents.size());
  return sb.toString();
}

代码示例来源:origin: alibaba/nacos

private void addResult(HealthCheckResult result) {
  if (!Switch.getIncrementalList().contains(result.getDom())) {
    return;
  }
  if (!healthCheckResults.offer(result)) {
    Loggers.EVT_LOG.warn("[HEALTH-CHECK-SYNC] failed to add check result to queue, queue size: {}", healthCheckResults.size());
  }
}

代码示例来源:origin: apache/flume

@Override
public synchronized void start() {
 channelCounter.start();
 channelCounter.setChannelSize(queue.size());
 channelCounter.setChannelCapacity(Long.valueOf(
     queue.size() + queue.remainingCapacity()));
 super.start();
}

代码示例来源:origin: apache/flume

@Override
protected void doRollback() {
 int takes = takeList.size();
 synchronized (queueLock) {
  Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
    "Not enough space in memory channel " +
    "queue to rollback takes. This should never happen, please report");
  while (!takeList.isEmpty()) {
   queue.addFirst(takeList.removeLast());
  }
  putList.clear();
 }
 putByteCounter = 0;
 takeByteCounter = 0;
 queueStored.release(takes);
 channelCounter.setChannelSize(queue.size());
}

代码示例来源:origin: apache/flume

@Override
protected void doPut(Event event) throws InterruptedException {
 channelCounter.incrementEventPutAttemptCount();
 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
 if (!putList.offer(event)) {
  throw new ChannelException(
    "Put queue for MemoryTransaction of capacity " +
    putList.size() + " full, consider committing more frequently, " +
    "increasing capacity or increasing thread count");
 }
 putByteCounter += eventByteSize;
}

代码示例来源:origin: apache/flume

private void resizeQueue(int capacity) throws InterruptedException {
 int oldCapacity;
 synchronized (queueLock) {
  oldCapacity = queue.size() + queue.remainingCapacity();
 }
 if (oldCapacity == capacity) {
  return;
 } else if (oldCapacity > capacity) {
  if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
   LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
  } else {
   synchronized (queueLock) {
    LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
    newQueue.addAll(queue);
    queue = newQueue;
   }
  }
 } else {
  synchronized (queueLock) {
   LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
   newQueue.addAll(queue);
   queue = newQueue;
  }
  queueRemaining.release(capacity - oldCapacity);
 }
}

代码示例来源:origin: apache/flume

@Override
public synchronized void stop() {
 channelCounter.setChannelSize(queue.size());
 channelCounter.stop();
 super.stop();
}

代码示例来源:origin: apache/hbase

/**
 * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
 * skip all calls which it thinks should be dropped.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
@Override
public CallRunner take() throws InterruptedException {
 CallRunner cr;
 while(true) {
  if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
   numLifoModeSwitches.increment();
   cr = queue.takeLast();
  } else {
   cr = queue.takeFirst();
  }
  if (needToDrop(cr)) {
   numGeneralCallsDropped.increment();
   cr.drop();
  } else {
   return cr;
  }
 }
}

代码示例来源:origin: apache/hbase

@Override
public CallRunner poll() {
 CallRunner cr;
 boolean switched = false;
 while(true) {
  if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
   // Only count once per switch.
   if (!switched) {
    switched = true;
    numLifoModeSwitches.increment();
   }
   cr = queue.pollLast();
  } else {
   switched = false;
   cr = queue.pollFirst();
  }
  if (cr == null) {
   return cr;
  }
  if (needToDrop(cr)) {
   numGeneralCallsDropped.increment();
   cr.drop();
  } else {
   return cr;
  }
 }
}

代码示例来源:origin: apache/flume

@Override
protected Event doTake() throws InterruptedException {
 channelCounter.incrementEventTakeAttemptCount();
 if (takeList.remainingCapacity() == 0) {
  throw new ChannelException("Take list for MemoryTransaction, capacity " +
    takeList.size() + " full, consider committing more frequently, " +
    "increasing capacity, or increasing thread count");
 }
 if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
  return null;
 }
 Event event;
 synchronized (queueLock) {
  event = queue.poll();
 }
 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
   "signalling existence of entry");
 takeList.put(event);
 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
 takeByteCounter += eventByteSize;
 return event;
}

代码示例来源:origin: apache/drill

void close() {
 logger.debug( "[#{}] Query listener closing.", instanceId );
 closed = true;
 if ( stopThrottlingIfSo() ) {
  logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
         instanceId, batchQueue.size() );
 }
 while (!batchQueue.isEmpty()) {
  // Don't bother with query timeout, we're closing the cursor
  QueryDataBatch qdb = batchQueue.poll();
  if (qdb != null && qdb.getData() != null) {
   qdb.getData().release();
  }
 }
 // Close may be called before the first result is received and therefore
 // when the main thread is blocked waiting for the result.  In that case
 // we want to unblock the main thread.
 firstMessageReceived.countDown(); // TODO:  Why not call releaseIfFirst as used elsewhere?
 completed = true;
}

代码示例来源:origin: jersey/jersey

@Override
public void close() throws IOException {
  if (queue.size() == CAPACITY) {
    boolean offer = false;
    try {
      offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      // ignore.
    }
    if (!offer) {
      queue.removeLast();
      queue.add(VOID);
    }
  } else {
    queue.add(VOID);
  }
  ctx.flush();
}

代码示例来源:origin: jankotek/mapdb

/**
 * Returns a new deque of given size containing consecutive
 * Integers 0 ... n - 1.
 */
private static LinkedBlockingDeque<Integer> populatedDeque(int n) {
  LinkedBlockingDeque<Integer> q =
    new LinkedBlockingDeque<Integer>(n);
  assertTrue(q.isEmpty());
  for (int i = 0; i < n; i++)
    assertTrue(q.offer(new Integer(i)));
  assertFalse(q.isEmpty());
  assertEquals(0, q.remainingCapacity());
  assertEquals(n, q.size());
  assertEquals((Integer) 0, q.peekFirst());
  assertEquals((Integer) (n - 1), q.peekLast());
  return q;
}

相关文章

微信公众号

最新文章

更多