本文整理了Java中java.util.concurrent.LinkedBlockingDeque
类的一些代码示例,展示了LinkedBlockingDeque
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingDeque
类的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingDeque
类名称:LinkedBlockingDeque
[英]An optionally-bounded BlockingDeque based on linked nodes.
The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if unspecified, is equal to Integer#MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the deque above capacity.
Most operations run in constant time (ignoring time spent blocking). Exceptions include #remove(Object), #removeFirstOccurrence, #removeLastOccurrence, #contains, #iterator, and the bulk operations, all of which run in linear time.
This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.
[中]
代码示例来源:origin: springside/springside4
/**
* 创建并发阻塞情况下,长度不受限的双端队列.
*
* 长度不受限,即生产者不会因为满而阻塞,但消费者会因为空而阻塞.
*/
public static <E> LinkedBlockingDeque<E> newBlockingUnlimitDeque() {
return new LinkedBlockingDeque<E>();
}
代码示例来源:origin: jersey/jersey
@Override
public void close() throws IOException {
if (queue.size() == CAPACITY) {
boolean offer = false;
try {
offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore.
}
if (!offer) {
queue.removeLast();
queue.add(VOID);
}
} else {
queue.add(VOID);
}
ctx.flush();
}
代码示例来源:origin: alibaba/nacos
public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) {
lock.lock();
try {
toBeUpdatedDomsQueue.offer(new DomainKey(namespaceId, domName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
} catch (Exception e) {
toBeUpdatedDomsQueue.poll();
toBeUpdatedDomsQueue.add(new DomainKey(namespaceId, domName, serverIP, checksum));
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add domain to be updatd to queue.", e);
} finally {
lock.unlock();
}
}
代码示例来源:origin: wildfly/wildfly
@Override
public void run() {
while (!taskRunQueue.isEmpty()) {
taskRunQueue.poll().run();
}
}
};
代码示例来源:origin: apache/flume
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
代码示例来源:origin: blazegraph/database
final long nanos = units.toNanos(timeout);
long remaining = nanos;
if (!dirtyListLock.tryLock(remaining, TimeUnit.NANOSECONDS))
throw new TimeoutException();
dirtyListChange.signalAll();
while (!dirtyList.isEmpty() && !halt) {
if (!dirtyListEmpty.await(remaining, TimeUnit.NANOSECONDS)) {
throw new TimeoutException();
dirtyListChange.signalAll();
while (!dirtyList.isEmpty() && !halt) {
dirtyListLock.unlock();
if (!cleanListLock.tryLock(remaining, TimeUnit.NANOSECONDS))
throw new TimeoutException();
try {
while (cleanList.isEmpty() && !halt) {
final WriteCache nxt = cleanList.take();
counters.get().nclean--;
代码示例来源:origin: blazegraph/database
private void runOnce() throws Throwable {
lock.lock();
try {
while(deque.isEmpty()) {
dequeNotEmpty.await();
if ((e = deque.peek()) == null)
throw new AssertionError();
if (log.isInfoEnabled())
log.info("\n==> Next event: " + e);
} finally {
lock.unlock();
watcher.eventReady.signalAll();
eventDone.await();
lock.lock();
try {
if (e != deque.take())
throw new AssertionError();
if (deque.isEmpty()) {
代码示例来源:origin: stackoverflow.com
private BlockingDeque<Packet> audioQueue = new LinkedBlockingDeque<Packet>(MAX_QUEUE_SIZE);
private BlockingDeque<Packet> videoQueue = new LinkedBlockingDeque<Packet>(MAX_QUEUE_SIZE);
private final static int MAX_QUEUE_SIZE = 1000;
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition morePackets = queueLock.newCondition();
private AtomicBoolean stopped = new AtomicBoolean(true);
private Thread t = null;
queueLock.lock();
try {
morePackets.signalAll();
} finally {
queueLock.unlock();
morePackets.signalAll();
if (!stopped.get() && !hasMorePackets(mask)) {
try {
morePackets.await();
} catch (InterruptedException e) {
代码示例来源:origin: io.fabric8.jube.images.fabric8/fabric8-mq
protected void sendToVertx(final Object o, final AsyncCallback asyncCallback) throws IOException {
final ReentrantLock lock = OpenWireReadStream.this.lock;
try {
lock.lockInterruptibly();
if (paused) {
if (queue == null) {
queue = new LinkedBlockingDeque<>();
Thread.currentThread().interrupt();
} finally {
lock.unlock();
代码示例来源:origin: blazegraph/database
private WriteCache takeFromClean() throws InterruptedException {
cleanListLock.lockInterruptibly();
try {
while (true) {
if (log.isInfoEnabled() && cleanList.isEmpty())
log.info("Waiting for clean buffer");
/*
* Note: We use the [cleanListNotEmpty] Condition so we can
* notice a [halt].
*/
while (cleanList.isEmpty() && !halt) {
cleanListNotEmpty.await();
}
if (halt)
throw new RuntimeException(firstCause.get());
// Poll() rather than take() since other methods poll() the list
// unprotected.
final WriteCache ret = cleanList.poll();
if (ret != null) {
return ret;
}
}
} finally {
cleanListLock.unlock();
}
}
代码示例来源:origin: blazegraph/database
dirtyListLock.lockInterruptibly();
try {
dirtyList.add(cache);
dirtyListChange.signalAll();
} finally {
dirtyListLock.unlock();
cleanListLock.lockInterruptibly();
while (cleanList.isEmpty() && !halt) {
cleanListNotEmpty.await();
final WriteCache newBuffer = cleanList.take();
代码示例来源:origin: stackoverflow.com
final LinkedBlockingDeque<Object> linkedBlockingDeque = new LinkedBlockingDeque<>();
final ExecutorService executorService = Executors.newFixedThreadPool(4);
final Lock fillLock = new ReentrantLock();
final AtomicBoolean stop = new AtomicBoolean(false);
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
stop.set(true);
executorService.shutdown();
Object o = linkedBlockingDeque.poll(1, TimeUnit.SECONDS);
if (o != null) {
handle(o);
if (linkedBlockingDeque.size() < 10) {
tryFill();
System.out.println("Filling");
for (int i = 0; i < 100; i++) {
linkedBlockingDeque.add(new Object());
代码示例来源:origin: blazegraph/database
dirtyListLock.lockInterruptibly();
try {
dirtyList.drainTo(c);
dirtyListEmpty.signalAll();
dirtyListChange.signalAll(); // NB: you must verify
dirtyListLock.unlock();
cleanListLock.lockInterruptibly();
try {
for (WriteCache x : c) {
x.resetWith(serviceMap);
cleanList.addFirst(x);
assert !cleanList.isEmpty();
cleanListNotEmpty.signalAll();
counters.get().nclean = cleanList.size();
} finally {
cleanListLock.unlock();
代码示例来源:origin: blazegraph/database
cleanListLock.lockInterruptibly();
try {
assert cache.isEmpty() || cache.isClosedForWrites();
if (addFirst) {
cleanList.addFirst(cache);
} else {
cleanList.addLast(cache);
cleanListNotEmpty.signalAll();
counters.get().nclean = cleanList.size();
} finally {
cleanListLock.unlock();
代码示例来源:origin: NationalSecurityAgency/datawave
if (!writeControl.tryLock()) {
while (!myResultQueue.isEmpty()) {
results.put(myResultQueue.poll(2, TimeUnit.MILLISECONDS));
if (log.isTraceEnabled())
log.trace("status" + Thread.interrupted() + " " + caller.isShutdown() + " " + service.isShutdown());
throw new RuntimeException(e);
} finally {
writeControl.unlock();
代码示例来源:origin: awslabs/mxnet-model-server
public Model(ModelArchive modelArchive, int queueSize) {
this.modelArchive = modelArchive;
batchSize = 1;
maxBatchDelay = 100;
jobsDb = new ConcurrentHashMap<>();
// Always have a queue for data
jobsDb.putIfAbsent(DEFAULT_DATA_QUEUE, new LinkedBlockingDeque<>(queueSize));
failedInfReqs = new AtomicInteger(0);
lock = new ReentrantLock();
}
代码示例来源:origin: apache/drill
throw executionFailureException;
if (completed && batchQueue.isEmpty()) {
return null;
} else {
QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (qdb != null) {
lastDequeuedBatchNumber++;
if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
|| batchQueue.size() == 0 // (in case threshold < 2)
) {
if ( stopThrottlingIfSo() ) {
logger.debug( "[#{}] Throttling stopped at queue size {}.",
instanceId, batchQueue.size() );
throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
代码示例来源:origin: Qihoo360/XLearning
ConcurrentHashMap<String, Object> map = gson.fromJson(cpuMetrics, type);
for (String str : map.keySet()) {
LinkedBlockingDeque<Object> queue = new LinkedBlockingDeque<>();
queue.add(map.get(str));
this.containersCpuMetrics.get(containerId).put(str, queue);
this.containersCpuStatistics.get(containerId).put(str, new ContainerMetricsStatisticsTuple(Double.parseDouble(new Gson().fromJson((JsonArray)map.get(str), ArrayList.class).get(1).toString())));
for (String str : map.keySet()) {
if (this.containersCpuMetrics.get(containerId).keySet().contains(str)) {
if (this.containersCpuMetrics.get(containerId).get(str).size() < 1800) {
this.containersCpuMetrics.get(containerId).get(str).add(map.get(str));
} else {
this.containersCpuMetrics.get(containerId).get(str).poll();
this.containersCpuMetrics.get(containerId).get(str).add(map.get(str));
LinkedBlockingDeque<Object> queue = new LinkedBlockingDeque<>();
queue.add(map.get(str));
this.containersCpuMetrics.get(containerId).put(str, queue);
this.containersCpuStatistics.get(containerId).put(str, new ContainerMetricsStatisticsTuple(Double.parseDouble(new Gson().fromJson((JsonArray)map.get(str), ArrayList.class).get(1).toString())));
代码示例来源:origin: jankotek/mapdb
/**
* Returns a new deque of given size containing consecutive
* Integers 0 ... n - 1.
*/
private static LinkedBlockingDeque<Integer> populatedDeque(int n) {
LinkedBlockingDeque<Integer> q =
new LinkedBlockingDeque<Integer>(n);
assertTrue(q.isEmpty());
for (int i = 0; i < n; i++)
assertTrue(q.offer(new Integer(i)));
assertFalse(q.isEmpty());
assertEquals(0, q.remainingCapacity());
assertEquals(n, q.size());
assertEquals((Integer) 0, q.peekFirst());
assertEquals((Integer) (n - 1), q.peekLast());
return q;
}
代码示例来源:origin: azkaban/azkaban
/**
* Ingest metric in snapshot data structure while maintaining interval {@inheritDoc}
*
* @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
*/
@Override
public void reportMetric(final IMetric<?> metric) throws MetricException {
final String metricName = metric.getName();
if (!this.historyListMapping.containsKey(metricName)) {
logger.info("First time capturing metric: " + metricName);
this.historyListMapping.put(metricName, new LinkedBlockingDeque<>());
}
synchronized (this.historyListMapping.get(metricName)) {
logger.debug("Ingesting metric: " + metricName);
this.historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
cleanUsingTime(metricName, this.historyListMapping.get(metricName).peekLast().getTimestamp());
}
}
内容来源于网络,如有侵权,请联系作者删除!