java.util.ArrayDeque.offer()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中java.util.ArrayDeque.offer()方法的一些代码示例,展示了ArrayDeque.offer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ArrayDeque.offer()方法的具体详情如下:
包路径:java.util.ArrayDeque
类名称:ArrayDeque
方法名:offer

ArrayDeque.offer介绍

[英]Inserts the specified element at the end of this deque.

This method is equivalent to #offerLast.
[中]在此数据块的末尾插入指定的元素。
此方法相当于#offerLast。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a flattened list of Throwables from tree-like CompositeException chain.
 * @param t the starting throwable
 * @return the list of Throwables flattened in a depth-first manner
 */
public static List<Throwable> flatten(Throwable t) {
  List<Throwable> list = new ArrayList<Throwable>();
  ArrayDeque<Throwable> deque = new ArrayDeque<Throwable>();
  deque.offer(t);
  while (!deque.isEmpty()) {
    Throwable e = deque.removeFirst();
    if (e instanceof CompositeException) {
      CompositeException ce = (CompositeException) e;
      List<Throwable> exceptions = ce.getExceptions();
      for (int i = exceptions.size() - 1; i >= 0; i--) {
        deque.offerFirst(exceptions.get(i));
      }
    } else {
      list.add(e);
    }
  }
  return list;
}

代码示例来源:origin: real-logic/aeron

private void queueResponse(
    final long correlationId, final long relevantId, final ControlResponseCode code, final String message)
  {
    queuedResponses.offer(() -> controlResponseProxy.sendResponse(
      controlSessionId,
      correlationId,
      relevantId,
      code,
      message,
      controlPublication));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onNext(T t) {
  if (index++ % skip == 0) {
    U b;
    try {
      b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
    } catch (Throwable e) {
      buffers.clear();
      upstream.dispose();
      downstream.onError(e);
      return;
    }
    buffers.offer(b);
  }
  Iterator<U> it = buffers.iterator();
  while (it.hasNext()) {
    U b = it.next();
    b.add(t);
    if (count <= b.size()) {
      it.remove();
      downstream.onNext(b);
    }
  }
}

代码示例来源:origin: redisson/redisson

/**
 * Returns a flattened list of Throwables from tree-like CompositeException chain.
 * @param t the starting throwable
 * @return the list of Throwables flattened in a depth-first manner
 */
public static List<Throwable> flatten(Throwable t) {
  List<Throwable> list = new ArrayList<Throwable>();
  ArrayDeque<Throwable> deque = new ArrayDeque<Throwable>();
  deque.offer(t);
  while (!deque.isEmpty()) {
    Throwable e = deque.removeFirst();
    if (e instanceof CompositeException) {
      CompositeException ce = (CompositeException) e;
      List<Throwable> exceptions = ce.getExceptions();
      for (int i = exceptions.size() - 1; i >= 0; i--) {
        deque.offerFirst(exceptions.get(i));
      }
    } else {
      list.add(e);
    }
  }
  return list;
}

代码示例来源:origin: apache/flink

completedQueue.offer(streamElementQueueEntry);
      completedQueue.offer(bufferEntry);
      it.remove();

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onNext(T t) {
  final ArrayDeque<UnicastSubject<T>> ws = windows;
  long i = index;
  long s = skip;
  if (i % s == 0 && !cancelled) {
    wip.getAndIncrement();
    UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
    ws.offer(w);
    downstream.onNext(w);
  }
  long c = firstEmission + 1;
  for (UnicastSubject<T> w : ws) {
    w.onNext(t);
  }
  if (c >= count) {
    ws.poll().onComplete();
    if (ws.isEmpty() && cancelled) {
      this.upstream.dispose();
      return;
    }
    firstEmission = c - s;
  } else {
    firstEmission = c;
  }
  index = i + 1;
}

代码示例来源:origin: apache/flink

Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
    watermarkSet.add(streamElementQueueEntry);
    uncompletedQueue.offer(watermarkSet);
  uncompletedQueue.offer(lastSet);
} else {
  lastSet.add(streamElementQueueEntry);

代码示例来源:origin: ReactiveX/RxJava

bs.offer(b);

代码示例来源:origin: redisson/redisson

@Override
public void onNext(T t) {
  if (index++ % skip == 0) {
    U b;
    try {
      b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
    } catch (Throwable e) {
      buffers.clear();
      s.dispose();
      actual.onError(e);
      return;
    }
    buffers.offer(b);
  }
  Iterator<U> it = buffers.iterator();
  while (it.hasNext()) {
    U b = it.next();
    b.add(t);
    if (count <= b.size()) {
      it.remove();
      actual.onNext(b);
    }
  }
}

代码示例来源:origin: apache/flume

@Override
protected void doPut(Event event) throws InterruptedException {
 channelCounter.incrementEventPutAttemptCount();
 putCalled = true;
 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize);
 if (!putList.offer(event)) {
  throw new ChannelFullException("Put queue in " + getName() +
    " channel's Transaction having capacity " + putList.size() +
    " full, consider reducing batch size of sources");
 }
 putListByteCount += eventByteSize;
}

代码示例来源:origin: redisson/redisson

@Override
public void onNext(T t) {
  final ArrayDeque<UnicastSubject<T>> ws = windows;
  long i = index;
  long s = skip;
  if (i % s == 0 && !cancelled) {
    wip.getAndIncrement();
    UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
    ws.offer(w);
    actual.onNext(w);
  }
  long c = firstEmission + 1;
  for (UnicastSubject<T> w : ws) {
    w.onNext(t);
  }
  if (c >= count) {
    ws.poll().onComplete();
    if (ws.isEmpty() && cancelled) {
      this.s.dispose();
      return;
    }
    firstEmission = c - s;
  } else {
    firstEmission = c;
  }
  index = i + 1;
}

代码示例来源:origin: apache/flume

private void commitPutsToPrimary() {
 synchronized (queueLock) {
  for (Event e : putList) {
   if (!memQueue.offer(e)) {
    throw new ChannelException("Unable to insert event into memory " +
      "queue in spite of spare capacity, this is very unexpected");
   }
  }
  drainOrder.putPrimary(putList.size());
  maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size()
    : maxMemQueueSize;
  channelCounter.setChannelSize(memQueue.size()
    + drainOrder.overflowCounter);
 }
 // update counters and semaphores
 totalStored.release(putList.size());
 channelCounter.addToEventPutSuccessCount(putList.size());
}

代码示例来源:origin: redisson/redisson

bs.offer(b);

代码示例来源:origin: ReactiveX/RxJava

windows.offer(w);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void postCompleteWithRequest() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  ArrayDeque<Integer> queue = new ArrayDeque<Integer>();
  AtomicLong state = new AtomicLong();
  BooleanSupplier isCancelled = new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
      return false;
    }
  };
  ts.onSubscribe(new BooleanSubscription());
  queue.offer(1);
  state.getAndIncrement();
  QueueDrainHelper.postComplete(ts, queue, state, isCancelled);
  ts.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void postCompleteCancelled() {
  final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  ArrayDeque<Integer> queue = new ArrayDeque<Integer>();
  AtomicLong state = new AtomicLong();
  BooleanSupplier isCancelled = new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
      return ts.isCancelled();
    }
  };
  ts.onSubscribe(new BooleanSubscription());
  queue.offer(1);
  state.getAndIncrement();
  ts.cancel();
  QueueDrainHelper.postComplete(ts, queue, state, isCancelled);
  ts.assertEmpty();
}

代码示例来源:origin: redisson/redisson

windows.offer(w);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void postCompleteCancelledAfterOne() {
  final TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      cancel();
    }
  };
  ArrayDeque<Integer> queue = new ArrayDeque<Integer>();
  AtomicLong state = new AtomicLong();
  BooleanSupplier isCancelled = new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
      return ts.isCancelled();
    }
  };
  ts.onSubscribe(new BooleanSubscription());
  queue.offer(1);
  state.getAndIncrement();
  QueueDrainHelper.postComplete(ts, queue, state, isCancelled);
  ts.assertValue(1).assertNoErrors().assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeRequestRace() {
  for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
    final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    final ArrayDeque<Integer> queue = new ArrayDeque<Integer>();
    final AtomicLong state = new AtomicLong();
    final BooleanSupplier isCancelled = new BooleanSupplier() {
      @Override
      public boolean getAsBoolean() throws Exception {
        return false;
      }
    };
    ts.onSubscribe(new BooleanSubscription());
    queue.offer(1);
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        QueueDrainHelper.postCompleteRequest(1, ts, queue, state, isCancelled);
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        QueueDrainHelper.postComplete(ts, queue, state, isCancelled);
      }
    };
    TestHelper.race(r1, r2);
    ts.assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

observers.offer(inner);

相关文章

微信公众号

最新文章

更多