java.util.concurrent.LinkedBlockingDeque类的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(177)

本文整理了Java中java.util.concurrent.LinkedBlockingDeque类的一些代码示例,展示了LinkedBlockingDeque类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。LinkedBlockingDeque类的具体详情如下:
包路径:java.util.concurrent.LinkedBlockingDeque
类名称:LinkedBlockingDeque

LinkedBlockingDeque介绍

[英]An optionally-bounded BlockingDeque based on linked nodes.

The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if unspecified, is equal to Integer#MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the deque above capacity.

Most operations run in constant time (ignoring time spent blocking). Exceptions include #remove(Object), #removeFirstOccurrence, #removeLastOccurrence, #contains, #iterator, and the bulk operations, all of which run in linear time.

This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.
[中]

代码示例

代码示例来源:origin: springside/springside4

/**
 * 创建并发阻塞情况下,长度不受限的双端队列.
 * 
 * 长度不受限,即生产者不会因为满而阻塞,但消费者会因为空而阻塞.
 */
public static <E> LinkedBlockingDeque<E> newBlockingUnlimitDeque() {
  return new LinkedBlockingDeque<E>();
}

代码示例来源:origin: jersey/jersey

@Override
public void close() throws IOException {
  if (queue.size() == CAPACITY) {
    boolean offer = false;
    try {
      offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      // ignore.
    }
    if (!offer) {
      queue.removeLast();
      queue.add(VOID);
    }
  } else {
    queue.add(VOID);
  }
  ctx.flush();
}

代码示例来源:origin: alibaba/nacos

public void addUpdatedDom2Queue(String namespaceId, String domName, String serverIP, String checksum) {
  lock.lock();
  try {
    toBeUpdatedDomsQueue.offer(new DomainKey(namespaceId, domName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
  } catch (Exception e) {
    toBeUpdatedDomsQueue.poll();
    toBeUpdatedDomsQueue.add(new DomainKey(namespaceId, domName, serverIP, checksum));
    Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add domain to be updatd to queue.", e);
  } finally {
    lock.unlock();
  }
}

代码示例来源:origin: wildfly/wildfly

@Override
  public void run() {
    while (!taskRunQueue.isEmpty()) {
      taskRunQueue.poll().run();
    }
  }
};

代码示例来源:origin: apache/flume

@Override
protected void doRollback() {
 int takes = takeList.size();
 synchronized (queueLock) {
  Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
    "Not enough space in memory channel " +
    "queue to rollback takes. This should never happen, please report");
  while (!takeList.isEmpty()) {
   queue.addFirst(takeList.removeLast());
  }
  putList.clear();
 }
 putByteCounter = 0;
 takeByteCounter = 0;
 queueStored.release(takes);
 channelCounter.setChannelSize(queue.size());
}

代码示例来源:origin: blazegraph/database

final long nanos = units.toNanos(timeout);
long remaining = nanos;
  if (!dirtyListLock.tryLock(remaining, TimeUnit.NANOSECONDS))
    throw new TimeoutException();
    dirtyListChange.signalAll();
    while (!dirtyList.isEmpty() && !halt) {
      if (!dirtyListEmpty.await(remaining, TimeUnit.NANOSECONDS)) {
        throw new TimeoutException();
        dirtyListChange.signalAll();
        while (!dirtyList.isEmpty() && !halt) {
      dirtyListLock.unlock();
  if (!cleanListLock.tryLock(remaining, TimeUnit.NANOSECONDS))
    throw new TimeoutException();
  try {
    while (cleanList.isEmpty() && !halt) {
    final WriteCache nxt = cleanList.take();
    counters.get().nclean--;

代码示例来源:origin: blazegraph/database

private void runOnce() throws Throwable {
  lock.lock();
  try {
    while(deque.isEmpty()) {
      dequeNotEmpty.await();
    if ((e = deque.peek()) == null)
      throw new AssertionError();
    if (log.isInfoEnabled())
      log.info("\n==> Next event: " + e);
  } finally {
    lock.unlock();
      watcher.eventReady.signalAll();
        eventDone.await();
  lock.lock();
  try {
    if (e != deque.take())
      throw new AssertionError();
    if (deque.isEmpty()) {

代码示例来源:origin: stackoverflow.com

private BlockingDeque<Packet> audioQueue = new LinkedBlockingDeque<Packet>(MAX_QUEUE_SIZE);
private BlockingDeque<Packet> videoQueue = new LinkedBlockingDeque<Packet>(MAX_QUEUE_SIZE);
private final static int MAX_QUEUE_SIZE = 1000;
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition morePackets = queueLock.newCondition();
private AtomicBoolean stopped = new AtomicBoolean(true);
private Thread t = null;
  queueLock.lock();
  try {
    morePackets.signalAll();
  } finally {
    queueLock.unlock();
          morePackets.signalAll();
      if (!stopped.get() && !hasMorePackets(mask)) {
        try {
          morePackets.await();
        } catch (InterruptedException e) {

代码示例来源:origin: io.fabric8.jube.images.fabric8/fabric8-mq

protected void sendToVertx(final Object o, final AsyncCallback asyncCallback) throws IOException {
  final ReentrantLock lock = OpenWireReadStream.this.lock;
  try {
    lock.lockInterruptibly();
    if (paused) {
      if (queue == null) {
        queue = new LinkedBlockingDeque<>();
    Thread.currentThread().interrupt();
  } finally {
    lock.unlock();

代码示例来源:origin: blazegraph/database

private WriteCache takeFromClean() throws InterruptedException {
  cleanListLock.lockInterruptibly();
  try {
    while (true) {
      if (log.isInfoEnabled() && cleanList.isEmpty())
        log.info("Waiting for clean buffer");
      /*
       * Note: We use the [cleanListNotEmpty] Condition so we can
       * notice a [halt].
       */
      while (cleanList.isEmpty() && !halt) {
        cleanListNotEmpty.await();
      }
      if (halt)
        throw new RuntimeException(firstCause.get());
      // Poll() rather than take() since other methods poll() the list
      // unprotected.
      final WriteCache ret = cleanList.poll();
      if (ret != null) {
        return ret;
      }
    }
  } finally {
    cleanListLock.unlock();
  }
}

代码示例来源:origin: blazegraph/database

dirtyListLock.lockInterruptibly();
try {
  dirtyList.add(cache);
  dirtyListChange.signalAll();
} finally {
  dirtyListLock.unlock();
cleanListLock.lockInterruptibly();
  while (cleanList.isEmpty() && !halt) {
    cleanListNotEmpty.await();
  final WriteCache newBuffer = cleanList.take();

代码示例来源:origin: stackoverflow.com

final LinkedBlockingDeque<Object> linkedBlockingDeque = new LinkedBlockingDeque<>();
final ExecutorService executorService = Executors.newFixedThreadPool(4);
final Lock fillLock = new ReentrantLock();
final AtomicBoolean stop = new AtomicBoolean(false);
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
stop.set(true);
executorService.shutdown();
    Object o = linkedBlockingDeque.poll(1, TimeUnit.SECONDS);
    if (o != null) {
      handle(o);
    if (linkedBlockingDeque.size() < 10) {
      tryFill();
    System.out.println("Filling");
    for (int i = 0; i < 100; i++) {
      linkedBlockingDeque.add(new Object());

代码示例来源:origin: blazegraph/database

dirtyListLock.lockInterruptibly();
try {
  dirtyList.drainTo(c);
  dirtyListEmpty.signalAll();
  dirtyListChange.signalAll(); // NB: you must verify
  dirtyListLock.unlock();
cleanListLock.lockInterruptibly();
try {
  for (WriteCache x : c) {
    x.resetWith(serviceMap);
     cleanList.addFirst(x);
  assert !cleanList.isEmpty();
  cleanListNotEmpty.signalAll();
  counters.get().nclean = cleanList.size();
} finally {
  cleanListLock.unlock();

代码示例来源:origin: blazegraph/database

cleanListLock.lockInterruptibly();
try {
  assert cache.isEmpty() || cache.isClosedForWrites();
  if (addFirst) {
    cleanList.addFirst(cache);
  } else  {
    cleanList.addLast(cache);
  cleanListNotEmpty.signalAll();
  counters.get().nclean = cleanList.size();
} finally {
  cleanListLock.unlock();

代码示例来源:origin: NationalSecurityAgency/datawave

if (!writeControl.tryLock()) {
  while (!myResultQueue.isEmpty()) {
    results.put(myResultQueue.poll(2, TimeUnit.MILLISECONDS));
    if (log.isTraceEnabled())
      log.trace("status" + Thread.interrupted() + " " + caller.isShutdown() + " " + service.isShutdown());
  throw new RuntimeException(e);
} finally {
  writeControl.unlock();

代码示例来源:origin: awslabs/mxnet-model-server

public Model(ModelArchive modelArchive, int queueSize) {
  this.modelArchive = modelArchive;
  batchSize = 1;
  maxBatchDelay = 100;
  jobsDb = new ConcurrentHashMap<>();
  // Always have a queue for data
  jobsDb.putIfAbsent(DEFAULT_DATA_QUEUE, new LinkedBlockingDeque<>(queueSize));
  failedInfReqs = new AtomicInteger(0);
  lock = new ReentrantLock();
}

代码示例来源:origin: apache/drill

throw executionFailureException;
if (completed && batchQueue.isEmpty()) {
 return null;
} else {
 QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
 if (qdb != null) {
  lastDequeuedBatchNumber++;
  if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
     || batchQueue.size() == 0  // (in case threshold < 2)
     ) {
   if ( stopThrottlingIfSo() ) {
    logger.debug( "[#{}] Throttling stopped at queue size {}.",
           instanceId, batchQueue.size() );
  throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));

代码示例来源:origin: Qihoo360/XLearning

ConcurrentHashMap<String, Object> map = gson.fromJson(cpuMetrics, type);
for (String str : map.keySet()) {
 LinkedBlockingDeque<Object> queue = new LinkedBlockingDeque<>();
 queue.add(map.get(str));
 this.containersCpuMetrics.get(containerId).put(str, queue);
 this.containersCpuStatistics.get(containerId).put(str, new ContainerMetricsStatisticsTuple(Double.parseDouble(new Gson().fromJson((JsonArray)map.get(str), ArrayList.class).get(1).toString())));
for (String str : map.keySet()) {
 if (this.containersCpuMetrics.get(containerId).keySet().contains(str)) {
  if (this.containersCpuMetrics.get(containerId).get(str).size() < 1800) {
   this.containersCpuMetrics.get(containerId).get(str).add(map.get(str));
  } else {
   this.containersCpuMetrics.get(containerId).get(str).poll();
   this.containersCpuMetrics.get(containerId).get(str).add(map.get(str));
  LinkedBlockingDeque<Object> queue = new LinkedBlockingDeque<>();
  queue.add(map.get(str));
  this.containersCpuMetrics.get(containerId).put(str, queue);
  this.containersCpuStatistics.get(containerId).put(str, new ContainerMetricsStatisticsTuple(Double.parseDouble(new Gson().fromJson((JsonArray)map.get(str), ArrayList.class).get(1).toString())));

代码示例来源:origin: jankotek/mapdb

/**
 * Returns a new deque of given size containing consecutive
 * Integers 0 ... n - 1.
 */
private static LinkedBlockingDeque<Integer> populatedDeque(int n) {
  LinkedBlockingDeque<Integer> q =
    new LinkedBlockingDeque<Integer>(n);
  assertTrue(q.isEmpty());
  for (int i = 0; i < n; i++)
    assertTrue(q.offer(new Integer(i)));
  assertFalse(q.isEmpty());
  assertEquals(0, q.remainingCapacity());
  assertEquals(n, q.size());
  assertEquals((Integer) 0, q.peekFirst());
  assertEquals((Integer) (n - 1), q.peekLast());
  return q;
}

代码示例来源:origin: azkaban/azkaban

/**
 * Ingest metric in snapshot data structure while maintaining interval {@inheritDoc}
 *
 * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
 */
@Override
public void reportMetric(final IMetric<?> metric) throws MetricException {
 final String metricName = metric.getName();
 if (!this.historyListMapping.containsKey(metricName)) {
  logger.info("First time capturing metric: " + metricName);
  this.historyListMapping.put(metricName, new LinkedBlockingDeque<>());
 }
 synchronized (this.historyListMapping.get(metricName)) {
  logger.debug("Ingesting metric: " + metricName);
  this.historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
  cleanUsingTime(metricName, this.historyListMapping.get(metricName).peekLast().getTimestamp());
 }
}

相关文章

微信公众号

最新文章

更多