本文整理了Java中java.util.concurrent.LinkedBlockingQueue.drainTo()
方法的一些代码示例,展示了LinkedBlockingQueue.drainTo()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingQueue.drainTo()
方法的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingQueue
类名称:LinkedBlockingQueue
方法名:drainTo
暂无
代码示例来源:origin: robovm/robovm
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
代码示例来源:origin: apache/storm
private void flushPending() {
IConnectionCallback serverCb = _server._cb;
if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
ArrayList<TaskMessage> ret = new ArrayList<>();
_pendingDueToUnregisteredServer.drainTo(ret);
serverCb.recv(ret);
}
}
代码示例来源:origin: apache/nifi
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
// resize the receive buffer, but preserve data
if (descriptor == PROP_MAX_QUEUE_SIZE) {
// it's a mandatory integer, never null
int newSize = Integer.valueOf(newValue);
if (mqttQueue != null) {
int msgPending = mqttQueue.size();
if (msgPending > newSize) {
logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.",
new Object[]{newSize, msgPending});
return;
}
LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
mqttQueue.drainTo(newBuffer);
mqttQueue = newBuffer;
}
}
}
代码示例来源:origin: xuxueli/xxl-job
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
代码示例来源:origin: redisson/redisson
private static void replayEvents() {
final LinkedBlockingQueue<SubstituteLoggingEvent> queue = SUBST_FACTORY.getEventQueue();
final int queueSize = queue.size();
int count = 0;
final int maxDrain = 128;
List<SubstituteLoggingEvent> eventList = new ArrayList<SubstituteLoggingEvent>(maxDrain);
while (true) {
int numDrained = queue.drainTo(eventList, maxDrain);
if (numDrained == 0)
break;
for (SubstituteLoggingEvent event : eventList) {
replaySingleEvent(event);
if (count++ == 0)
emitReplayOrSubstituionWarning(event, queueSize);
}
eventList.clear();
}
}
代码示例来源:origin: apache/incubator-druid
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
代码示例来源:origin: googleapis/google-cloud-java
@InternalApi
void processOutstandingAckOperations() {
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<>();
List<String> acksToSend = new ArrayList<>();
pendingAcks.drainTo(acksToSend);
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
pendingNacks.drainTo(nacksToSend.ackIds);
logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
if (!nacksToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(nacksToSend);
}
PendingModifyAckDeadline receiptsToSend =
new PendingModifyAckDeadline(getMessageDeadlineSeconds());
pendingReceipts.drainTo(receiptsToSend.ackIds);
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
if (!receiptsToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(receiptsToSend);
}
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
}
代码示例来源:origin: apache/geode
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int result = super.drainTo(c, maxElements);
if (result > 0) {
this.stats.remove(result);
postDrain(c);
}
return result;
}
代码示例来源:origin: apache/geode
@Override
public int drainTo(Collection<? super E> c) {
int result = super.drainTo(c);
if (result > 0) {
this.stats.remove(result);
postDrain(c);
}
return result;
}
代码示例来源:origin: prestodb/presto
private synchronized List<ConnectorSplit> getBatch(int maxSize)
{
// take up to maxSize elements from the queue
List<ConnectorSplit> elements = new ArrayList<>(maxSize);
queue.drainTo(elements, maxSize);
// if the queue is empty and the current future is finished, create a new one so
// a new readers can be notified when the queue has elements to read
if (queue.isEmpty() && !closed) {
if (notEmptyFuture.isDone()) {
notEmptyFuture = new CompletableFuture<>();
}
}
return ImmutableList.copyOf(elements);
}
代码示例来源:origin: wildfly/wildfly
private static void replayEvents() {
final LinkedBlockingQueue<SubstituteLoggingEvent> queue = SUBST_FACTORY.getEventQueue();
final int queueSize = queue.size();
int count = 0;
final int maxDrain = 128;
List<SubstituteLoggingEvent> eventList = new ArrayList<SubstituteLoggingEvent>(maxDrain);
while (true) {
int numDrained = queue.drainTo(eventList, maxDrain);
if (numDrained == 0)
break;
for (SubstituteLoggingEvent event : eventList) {
replaySingleEvent(event);
if (count++ == 0)
emitReplayOrSubstituionWarning(event, queueSize);
}
eventList.clear();
}
}
代码示例来源:origin: lingochamp/FileDownloader
private void push() {
final int delayMillis;
synchronized (queueLock) {
if (!disposingList.isEmpty()) {
// is disposing.
return;
}
if (waitingQueue.isEmpty()) {
// not messenger need be handled.
return;
}
if (!isIntervalValid()) {
waitingQueue.drainTo(disposingList);
delayMillis = 0;
} else {
delayMillis = INTERVAL;
final int size = Math.min(waitingQueue.size(), SUB_PACKAGE_SIZE);
for (int i = 0; i < size; i++) {
disposingList.add(waitingQueue.remove());
}
}
}
handler.sendMessageDelayed(handler.obtainMessage(DISPOSE_MESSENGER_LIST, disposingList),
delayMillis);
}
代码示例来源:origin: lealone/Lealone
private void commitTransactions() {
if (transactions.isEmpty())
return;
ArrayList<MVCCTransaction> oldTransactions = new ArrayList<>(transactions.size());
transactions.drainTo(oldTransactions);
for (MVCCTransaction t : oldTransactions) {
if (t.getSession() != null)
t.getSession().commit(null);
else
t.commit();
}
}
代码示例来源:origin: apache/incubator-druid
queue.drainTo(segments);
announcer.announceSegments(segments);
代码示例来源:origin: jankotek/mapdb
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final LinkedBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
代码示例来源:origin: Red5/red5-server
pendingOutMessages.drainTo(sendList, Math.min(164, available));
result = IoBuffer.allocate(targetSize).setAutoExpand(true);
for (PendingData pendingMessage : sendList) {
代码示例来源:origin: qunarcorp/qmq
private void setResult(PullMessageFuture future, List<Message> messages) {
int expectedSize = future.getExpectedSize();
if (expectedSize <= 0) {
localBuffer.addAll(messages);
future.set(Collections.<Message>emptyList());
return;
}
List<Message> result = new ArrayList<>(expectedSize);
int bufferSize = localBuffer.size();
if (bufferSize > 0) {
localBuffer.drainTo(result, Math.min(expectedSize, bufferSize));
}
int need = expectedSize - result.size();
if (need <= 0) {
localBuffer.addAll(messages);
future.set(result);
return;
}
result.addAll(head(messages, need));
localBuffer.addAll(tail(messages, need));
future.set(result);
}
代码示例来源:origin: jankotek/mapdb
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
LinkedBlockingQueue q = new LinkedBlockingQueue();
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
代码示例来源:origin: apache/storm
public void prepareAndExecuteStatement() {
int size = queue.size();
if (size > 0) {
List<Tuple> inputs = new ArrayList<>(size);
queue.drainTo(inputs);
try {
List<PairStatementTuple> psl = buildStatement(inputs);
int sinceLastModified = updateAndGetSecondsSinceLastModified();
LOG.debug(logPrefix() + "Execute cql batches with {} statements after {} seconds", size, sinceLastModified);
checkTimeElapsedSinceLastExec(sinceLastModified);
GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConf.getBatchSizeRows(), psl);
int batchSize = 0;
for (PairBatchStatementTuples batch : batchBuilder) {
LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConf.getKeyspace(),
batch.getInputs().size());
getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
batchSize++;
}
int pending = getAsyncExecutor().getPendingTasksSize();
if (pending > batchSize) {
LOG.warn(logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize,
pending);
}
} catch (Throwable r) {
LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements");
getAsyncHandler().failure(r, inputs);
}
}
}
代码示例来源:origin: jankotek/mapdb
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
LinkedBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
内容来源于网络,如有侵权,请联系作者删除!