本文整理了Java中com.oath.cyclops.async.adapters.Queue
类的一些代码示例,展示了Queue
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue
类的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
[英]Inspired by scalaz-streams async.Queue (functionally similar, but wraps a JDK Queue - wait-free or Blocking) A Queue that takes data from one or more input Streams and provides them to one or more emitted Streams Interface specifies a BlockingQueue, but non-BlockingQueues (such as ConcurrentLinkedQueue can be used in conjunction with an implementation of the Continuation interface
[中]灵感来自scalaz streams async。队列(功能类似,但包装了JDK队列-无等待或阻塞)从一个或多个输入流获取数据并将其提供给一个或多个发出流的队列接口指定了阻塞队列,但是非阻塞队列(例如ConcurrentLinkedQueue)可以与Continuation接口的实现结合使用
代码示例来源:origin: aol/cyclops
@Override
public void onComplete() {
counter.active.decrementAndGet();
counter.subscription.remove(subscription);
if (queue != null && counter.active.get() == 0) {
if (counter.completable) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
final List current = new ArrayList();
while (queue.size() > 0) {
try {
current.add(queue.get());
}catch(ClosedQueueException e){
break;
}
}
throw new ClosedQueueException(
current);
}));
queue.close();
}
}
}
}
代码示例来源:origin: aol/cyclops
/**
* <pre>
* {@code
* ReactiveSeq.of(1,2,3)
.flatMapP(i->ReactiveSeq.range(i,1500),1000,QueueFactories.unboundedQueue())
.listX()
* }
* </pre>
*
* @return A QueueFactory for unbounded Queues backed by a LinkedBlockingQueue
*/
public static <T> QueueFactory<T> unboundedQueue() {
return () -> new Queue<T>();
}
代码示例来源:origin: aol/cyclops
@Override
public boolean offer(T t) {
return host.offer(t);
}
代码示例来源:origin: aol/cyclops
public void close() {
counter.completable = true;
if (queue != null && counter.active.get() == 0) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
throw new ClosedQueueException();
}));
queue.close();
}
}
}
代码示例来源:origin: aol/cyclops
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
AtomicReference<Continuation> ref = new AtomicReference<>(null);
Continuation cont =
new Continuation(()->{
if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
try {
//use the first consuming thread to tell this Stream onto the Queue
this.spliterator().forEachRemaining(queue::offer);
}finally {
queue.close();
}
}
return Continuation.empty();
});
;
queue.addContinuation(cont);
return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){
代码示例来源:origin: Nextdoor/bender
@Override
public void run() {
this.input.forEach(ievent -> {
this.output.offer(ievent);
});
this.input.close();
/*
* When all consumers are done have the last one close the queue.
*/
if (countdown.decrementAndGet() <= 0) {
this.output.close();
}
}
}
代码示例来源:origin: aol/cyclops
try {
if (!local.hasNext()) {
queue.close();
return Continuation.empty();
} else {
queue.offer(local.next());
queue.close();
throw ExceptionSoftener.throwSoftenedException(t);
queue.addContinuation(cont);
return queue.stream();
代码示例来源:origin: Nextdoor/bender
new Queue<InternalEvent>(new LinkedBlockingQueue<InternalEvent>(opProcsInFork.size()));
this.queues.add(queue);
Stream<InternalEvent> forkInput = queue.jdkStream();
for (OperationProcessor opProcInFork : opProcsInFork) {
forkInput = opProcInFork.perform(forkInput);
new Queue<InternalEvent>(new LinkedBlockingQueue<InternalEvent>(this.queues.size()));
AtomicInteger lock = new AtomicInteger(forkOutputStreams.size());
return outputQueue.jdkStream();
代码示例来源:origin: aol/cyclops
@Override
public Stream<T> unwrapStream() {
if (async == Type.NO_BACKPRESSURE) {
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
AtomicBoolean wip = new AtomicBoolean(false);
Continuation cont = new Continuation(() -> {
if (wip.compareAndSet(false, true)) {
this.source.subscribeAll(queue::offer, i -> {
queue.close();
}, () -> queue.close());
}
return Continuation.empty();
});
queue.addContinuation(cont);
return queue.stream();
}
return StreamSupport.stream(new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).spliterator(), false);
}
代码示例来源:origin: aol/cyclops
@Test
public void queueTest(){
com.oath.cyclops.async.adapters.Queue<Integer> q = new Queue<>();
q.add(1);
q.add(2);
q.add(3);
q.stream().limit(3).forEach(System.out::println);
q.add(4);
q.add(5);
q.stream().limit(2).forEach(System.out::println);
}
@Test
代码示例来源:origin: aol/cyclops
@Test
public void publishTest() {
for (int k = 0; k < ITERATIONS; k++) {
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
try {
System.out.println("Sleeping!");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waking!");
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
of(1, 2, 3).peek(i -> System.out.println("publishing " + i))
.publishTo(queue)
.forEach(System.out::println);
assertThat(queue.stream().collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
t = null;
System.gc();
}
}
代码示例来源:origin: aol/cyclops
@Test
public void mergeAdapterTest() {
for (int k = 0; k < ITERATIONS; k++) {
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
queue.add(1);
queue.add(2);
queue.add(3);
try {
System.out.println("Sleeping!");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waking!");
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
assertThat(this.<Integer>of().peek(i -> System.out.println("publishing " + i))
.merge(queue).collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
t = null;
System.gc();
}
}
代码示例来源:origin: aol/cyclops
/**
* Close this Topic
*
* @return true if closed
*/
@Override
public boolean close() {
this.distributor.getSubscribers()
.forEach(it -> it.close());
return true;
}
代码示例来源:origin: aol/cyclops
/**
* @return Infinite (until Queue is closed) Stream of CompletableFutures
* that can be used as input into a SimpleReact concurrent dataflow
*
* This Stream itself is Sequential, SimpleReact will applyHKT
* concurrency / parralellism via the constituent CompletableFutures
*
*/
@Override
public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
return stream().map(CompletableFuture::completedFuture);
}
代码示例来源:origin: aol/cyclops
/**
* Return a standard (unextended) JDK Stream connected to this Queue
* To disconnect cleanly close the queue
*
* <pre>
* {@code
* use queue.stream().parallel() to convert to a parallel Stream
* }
* </pre>
* @see Queue#jdkStream(int) for an alternative that sends more poision pills for use with parallel Streams.
*
* @return Java 8 Stream connnected to this Queue
*/
public Stream<T> jdkStream() {
return jdkStream(2);
}
代码示例来源:origin: aol/cyclops
@Test
public void mergeAdapterTest1() {
for (int k = 0; k < ITERATIONS; k++) {
System.out.println("Test iteration " + k);
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
queue.add(1);
queue.add(2);
queue.add(3);
try {
// System.out.println("Sleeping!");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
assertThat(this.<Integer>of(10).peek(i -> System.out.println("publishing " + i))
.merge(queue).collect(Collectors.toList()), hasItems(10, 1, 2, 3));
t = null;
System.gc();
}
}
代码示例来源:origin: aol/cyclops
Queue<T> queue = QueueFactories.<T>boundedNonBlockingQueue(1000)
.build()
.withTimeout(1);
queue.addContinuation(cont);
return topic;
.withTimeout(1);
s.request(1000-queue.size());
}finally {
wip.set(false);
queue.addContinuation(cont);
return topic;
代码示例来源:origin: aol/cyclops
@Test
public void publishToAndMerge() {
for (int k = 0; k < ITERATIONS; k++) {
System.out.println("Publish toNested and zip iteration " + k);
com.oath.cyclops.async.adapters.Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
AtomicBoolean complete = new AtomicBoolean(false);
AtomicBoolean start = new AtomicBoolean(false);
List<Integer> list = of(1, 2, 3)
.publishTo(queue)
.peek(System.out::println)
.merge(queue)
.toList();
assertThat(list, hasItems(1, 2, 3));
assertThat(list.size(), equalTo(6));
System.gc();
}
}
代码示例来源:origin: aol/cyclops
private void closeQueueIfFinished(final Queue queue, final Function<AtomicLong, Long> fn) {
if (queues.size() == 0)
return;
final long queueCount = fn.apply(count.get(queue));
final long limit = valuesToRight(queue).stream()
.reduce((acc, next) -> Math.min(acc, next))
.get();
if (queueCount >= limit) { //last entry - close THIS queue only!
queue.closeAndClear();
closed.set(true);
}
}
代码示例来源:origin: aol/cyclops
public void closeAndClear() {
this.open = false;
add((T) CLEAR_PILL);
}
内容来源于网络,如有侵权,请联系作者删除!