java.util.concurrent.LinkedBlockingQueue.drainTo()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(258)

本文整理了Java中java.util.concurrent.LinkedBlockingQueue.drainTo()方法的一些代码示例,展示了LinkedBlockingQueue.drainTo()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingQueue.drainTo()方法的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingQueue
类名称:LinkedBlockingQueue
方法名:drainTo

LinkedBlockingQueue.drainTo介绍

暂无

代码示例

代码示例来源:origin: robovm/robovm

/**
 * @throws UnsupportedOperationException {@inheritDoc}
 * @throws ClassCastException            {@inheritDoc}
 * @throws NullPointerException          {@inheritDoc}
 * @throws IllegalArgumentException      {@inheritDoc}
 */
public int drainTo(Collection<? super E> c) {
  return drainTo(c, Integer.MAX_VALUE);
}

代码示例来源:origin: apache/storm

private void flushPending() {
  IConnectionCallback serverCb = _server._cb;
  if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
    ArrayList<TaskMessage> ret = new ArrayList<>();
    _pendingDueToUnregisteredServer.drainTo(ret);
    serverCb.recv(ret);
  }
}

代码示例来源:origin: apache/nifi

@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
  // resize the receive buffer, but preserve data
  if (descriptor == PROP_MAX_QUEUE_SIZE) {
    // it's a mandatory integer, never null
    int newSize = Integer.valueOf(newValue);
    if (mqttQueue != null) {
      int msgPending = mqttQueue.size();
      if (msgPending > newSize) {
        logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.",
            new Object[]{newSize, msgPending});
        return;
      }
      LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
      mqttQueue.drainTo(newBuffer);
      mqttQueue = newBuffer;
    }
  }
}

代码示例来源:origin: xuxueli/xxl-job

int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
    callbackParamList.add(callback);
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
  doCallback(callbackParamList);

代码示例来源:origin: redisson/redisson

private static void replayEvents() {
  final LinkedBlockingQueue<SubstituteLoggingEvent> queue = SUBST_FACTORY.getEventQueue();
  final int queueSize = queue.size();
  int count = 0;
  final int maxDrain = 128;
  List<SubstituteLoggingEvent> eventList = new ArrayList<SubstituteLoggingEvent>(maxDrain);
  while (true) {
    int numDrained = queue.drainTo(eventList, maxDrain);
    if (numDrained == 0)
      break;
    for (SubstituteLoggingEvent event : eventList) {
      replaySingleEvent(event);
      if (count++ == 0)
        emitReplayOrSubstituionWarning(event, queueSize);
    }
    eventList.clear();
  }
}

代码示例来源:origin: apache/incubator-druid

@Override
 public void run()
 {
  synchronized (lock) {
   try {
    if (!(finished && queue.isEmpty())) {
     final List<DataSegment> segments = new ArrayList<>();
     queue.drainTo(segments);
     try {
      announcer.announceSegments(segments);
      nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
     }
     catch (IOException e) {
      doneAnnouncing.setException(
        new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
      );
     }
    } else {
     doneAnnouncing.set(true);
    }
   }
   catch (Exception e) {
    doneAnnouncing.setException(e);
   }
  }
 }
},

代码示例来源:origin: googleapis/google-cloud-java

@InternalApi
void processOutstandingAckOperations() {
 List<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<>();
 List<String> acksToSend = new ArrayList<>();
 pendingAcks.drainTo(acksToSend);
 logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
 PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
 pendingNacks.drainTo(nacksToSend.ackIds);
 logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
 if (!nacksToSend.ackIds.isEmpty()) {
  modifyAckDeadlinesToSend.add(nacksToSend);
 }
 PendingModifyAckDeadline receiptsToSend =
   new PendingModifyAckDeadline(getMessageDeadlineSeconds());
 pendingReceipts.drainTo(receiptsToSend.ackIds);
 logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
 if (!receiptsToSend.ackIds.isEmpty()) {
  modifyAckDeadlinesToSend.add(receiptsToSend);
 }
 ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
}

代码示例来源:origin: apache/geode

@Override
public int drainTo(Collection<? super E> c, int maxElements) {
 int result = super.drainTo(c, maxElements);
 if (result > 0) {
  this.stats.remove(result);
  postDrain(c);
 }
 return result;
}

代码示例来源:origin: apache/geode

@Override
public int drainTo(Collection<? super E> c) {
 int result = super.drainTo(c);
 if (result > 0) {
  this.stats.remove(result);
  postDrain(c);
 }
 return result;
}

代码示例来源:origin: prestodb/presto

private synchronized List<ConnectorSplit> getBatch(int maxSize)
{
  // take up to maxSize elements from the queue
  List<ConnectorSplit> elements = new ArrayList<>(maxSize);
  queue.drainTo(elements, maxSize);
  // if the queue is empty and the current future is finished, create a new one so
  // a new readers can be notified when the queue has elements to read
  if (queue.isEmpty() && !closed) {
    if (notEmptyFuture.isDone()) {
      notEmptyFuture = new CompletableFuture<>();
    }
  }
  return ImmutableList.copyOf(elements);
}

代码示例来源:origin: wildfly/wildfly

private static void replayEvents() {
  final LinkedBlockingQueue<SubstituteLoggingEvent> queue = SUBST_FACTORY.getEventQueue();
  final int queueSize = queue.size();
  int count = 0;
  final int maxDrain = 128;
  List<SubstituteLoggingEvent> eventList = new ArrayList<SubstituteLoggingEvent>(maxDrain);
  while (true) {
    int numDrained = queue.drainTo(eventList, maxDrain);
    if (numDrained == 0)
      break;
    for (SubstituteLoggingEvent event : eventList) {
      replaySingleEvent(event);
      if (count++ == 0)
        emitReplayOrSubstituionWarning(event, queueSize);
    }
    eventList.clear();
  }
}

代码示例来源:origin: lingochamp/FileDownloader

private void push() {
  final int delayMillis;
  synchronized (queueLock) {
    if (!disposingList.isEmpty()) {
      // is disposing.
      return;
    }
    if (waitingQueue.isEmpty()) {
      // not messenger need be handled.
      return;
    }
    if (!isIntervalValid()) {
      waitingQueue.drainTo(disposingList);
      delayMillis = 0;
    } else {
      delayMillis = INTERVAL;
      final int size = Math.min(waitingQueue.size(), SUB_PACKAGE_SIZE);
      for (int i = 0; i < size; i++) {
        disposingList.add(waitingQueue.remove());
      }
    }
  }
  handler.sendMessageDelayed(handler.obtainMessage(DISPOSE_MESSENGER_LIST, disposingList),
      delayMillis);
}

代码示例来源:origin: lealone/Lealone

private void commitTransactions() {
  if (transactions.isEmpty())
    return;
  ArrayList<MVCCTransaction> oldTransactions = new ArrayList<>(transactions.size());
  transactions.drainTo(oldTransactions);
  for (MVCCTransaction t : oldTransactions) {
    if (t.getSession() != null)
      t.getSession().commit(null);
    else
      t.commit();
  }
}

代码示例来源:origin: apache/incubator-druid

queue.drainTo(segments);
announcer.announceSegments(segments);

代码示例来源:origin: jankotek/mapdb

/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
  final LinkedBlockingQueue q = populatedQueue(SIZE);
  Thread t = new Thread(new CheckedRunnable() {
    public void realRun() throws InterruptedException {
      q.put(new Integer(SIZE + 1));
    }});
  t.start();
  ArrayList l = new ArrayList();
  q.drainTo(l);
  assertTrue(l.size() >= SIZE);
  for (int i = 0; i < SIZE; ++i)
    assertEquals(l.get(i), new Integer(i));
  t.join();
  assertTrue(q.size() + l.size() >= SIZE);
}

代码示例来源:origin: Red5/red5-server

pendingOutMessages.drainTo(sendList, Math.min(164, available));
result = IoBuffer.allocate(targetSize).setAutoExpand(true);
for (PendingData pendingMessage : sendList) {

代码示例来源:origin: qunarcorp/qmq

private void setResult(PullMessageFuture future, List<Message> messages) {
  int expectedSize = future.getExpectedSize();
  if (expectedSize <= 0) {
    localBuffer.addAll(messages);
    future.set(Collections.<Message>emptyList());
    return;
  }
  List<Message> result = new ArrayList<>(expectedSize);
  int bufferSize = localBuffer.size();
  if (bufferSize > 0) {
    localBuffer.drainTo(result, Math.min(expectedSize, bufferSize));
  }
  int need = expectedSize - result.size();
  if (need <= 0) {
    localBuffer.addAll(messages);
    future.set(result);
    return;
  }
  result.addAll(head(messages, need));
  localBuffer.addAll(tail(messages, need));
  future.set(result);
}

代码示例来源:origin: jankotek/mapdb

/**
 * drainTo(c, n) empties first min(n, size) elements of queue into c
 */
public void testDrainToN() {
  LinkedBlockingQueue q = new LinkedBlockingQueue();
  for (int i = 0; i < SIZE + 2; ++i) {
    for (int j = 0; j < SIZE; j++)
      assertTrue(q.offer(new Integer(j)));
    ArrayList l = new ArrayList();
    q.drainTo(l, i);
    int k = (i < SIZE) ? i : SIZE;
    assertEquals(k, l.size());
    assertEquals(SIZE - k, q.size());
    for (int j = 0; j < k; ++j)
      assertEquals(l.get(j), new Integer(j));
    do {} while (q.poll() != null);
  }
}

代码示例来源:origin: apache/storm

public void prepareAndExecuteStatement() {
  int size = queue.size();
  if (size > 0) {
    List<Tuple> inputs = new ArrayList<>(size);
    queue.drainTo(inputs);
    try {
      List<PairStatementTuple> psl = buildStatement(inputs);
      int sinceLastModified = updateAndGetSecondsSinceLastModified();
      LOG.debug(logPrefix() + "Execute cql batches with {} statements after {} seconds", size, sinceLastModified);
      checkTimeElapsedSinceLastExec(sinceLastModified);
      GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConf.getBatchSizeRows(), psl);
      int batchSize = 0;
      for (PairBatchStatementTuples batch : batchBuilder) {
        LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConf.getKeyspace(),
             batch.getInputs().size());
        getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
        batchSize++;
      }
      int pending = getAsyncExecutor().getPendingTasksSize();
      if (pending > batchSize) {
        LOG.warn(logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize,
             pending);
      }
    } catch (Throwable r) {
      LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements");
      getAsyncHandler().failure(r, inputs);
    }
  }
}

代码示例来源:origin: jankotek/mapdb

/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
  LinkedBlockingQueue q = populatedQueue(SIZE);
  ArrayList l = new ArrayList();
  q.drainTo(l);
  assertEquals(0, q.size());
  assertEquals(SIZE, l.size());
  for (int i = 0; i < SIZE; ++i)
    assertEquals(l.get(i), new Integer(i));
  q.add(zero);
  q.add(one);
  assertFalse(q.isEmpty());
  assertTrue(q.contains(zero));
  assertTrue(q.contains(one));
  l.clear();
  q.drainTo(l);
  assertEquals(0, q.size());
  assertEquals(2, l.size());
  for (int i = 0; i < 2; ++i)
    assertEquals(l.get(i), new Integer(i));
}

相关文章

微信公众号

最新文章

更多