本文整理了Java中org.apache.edgent.topology.TStream.pipe()
方法的一些代码示例,展示了TStream.pipe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.pipe()
方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:pipe
[英]Declare a stream that contains the output of the specified Pipeoplet applied to this stream.
[中]声明一个流,其中包含应用于该流的指定Pipeoplet的输出。
代码示例来源:origin: apache/incubator-edgent
/**
* Increment a counter metric when peeking at each tuple.
*
* @param <T>
* TStream tuple type
* @param stream to stream to instrument
* @return a {@link TStream} containing the input tuples
*/
public static <T> TStream<T> counter(TStream<T> stream) {
return stream.pipe(new CounterOp<T>());
}
代码示例来源:origin: apache/incubator-edgent
/**
* Measure current tuple throughput and calculate one-, five-, and
* fifteen-minute exponentially-weighted moving averages.
*
* @param <T>
* TStream tuple type
* @param stream to stream to instrument
* @return a {@link TStream} containing the input tuples
*/
public static <T> TStream<T> rateMeter(TStream<T> stream) {
return stream.pipe(new RateMeter<T>());
}
代码示例来源:origin: apache/incubator-edgent
/**
* Isolate upstream processing from downstream processing.
* <P>
* If the processing against the returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}, upstream
* processing will block until there is space in the queue between
* the streams.
* </P><P>
* Processing of tuples occurs in the order they were received.
* </P>
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param queueCapacity size of the queue between {@code stream} and
* the returned stream.
* @return Stream that is isolated from {@code stream}.
* @see #pressureReliever(TStream, Function, int) pressureReliever
*/
public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
return stream.pipe(new Isolate<T>(queueCapacity));
}
代码示例来源:origin: org.apache.edgent/edgent-api-topology
/**
* Isolate upstream processing from downstream processing.
* <P>
* If the processing against the returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}, upstream
* processing will block until there is space in the queue between
* the streams.
* </P><P>
* Processing of tuples occurs in the order they were received.
* </P>
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param queueCapacity size of the queue between {@code stream} and
* the returned stream.
* @return Stream that is isolated from {@code stream}.
* @see #pressureReliever(TStream, Function, int) pressureReliever
*/
public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
return stream.pipe(new Isolate<T>(queueCapacity));
}
代码示例来源:origin: org.apache.edgent/edgent-api-topology
return stream.pipe(new PressureReliever<>(count, keyFunction));
代码示例来源:origin: org.apache.edgent/edgent-api-topology
/**
* Isolate upstream processing from downstream processing.
* <BR>
* Implementations may throw {@code OutOfMemoryExceptions}
* if the processing against returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}.
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param ordered {@code true} to maintain arrival order on the returned stream,
* {@code false} to not guaranteed arrival order.
* @return Stream that is isolated from {@code stream}.
*/
public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
return stream.pipe(
ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}
代码示例来源:origin: apache/incubator-edgent
/**
* Isolate upstream processing from downstream processing.
* <BR>
* Implementations may throw {@code OutOfMemoryExceptions}
* if the processing against returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}.
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param ordered {@code true} to maintain arrival order on the returned stream,
* {@code false} to not guaranteed arrival order.
* @return Stream that is isolated from {@code stream}.
*/
public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
return stream.pipe(
ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}
代码示例来源:origin: apache/incubator-edgent
@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) {
processor = Functions.synchronizedBiFunction(processor);
Window<T, K, LinkedList<T>> window = Windows.lastNProcessOnInsert(size, getKeyFunction());
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
return feeder().pipe(op);
}
代码示例来源:origin: org.apache.edgent/edgent-spi-topology
@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) {
processor = Functions.synchronizedBiFunction(processor);
Window<T, K, LinkedList<T>> window = Windows.lastNProcessOnInsert(size, getKeyFunction());
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
return feeder().pipe(op);
}
代码示例来源:origin: org.apache.edgent/edgent-spi-topology
@Override
public <J, U, K> TStream<J> joinLast(Function<T, K> keyer,
TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner) {
BiFunction<List<U>,K, Object> processor = Functions.synchronizedBiFunction((list, key) -> null);
Window<U, K, LinkedList<U>> window = Windows.lastNProcessOnInsert(1, lastStreamKeyer);
Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window, processor);
lastStream.pipe(op);
return this.map((tuple) -> {
Partition<U, K, ? extends List<U>> part = window.getPartitions().get(keyer.apply(tuple));
if(part == null)
return null;
J ret;
synchronized (part) {
U last = part.getContents().get(0);
ret = joiner.apply(tuple, last);
}
return ret;
});
}
代码示例来源:origin: org.apache.edgent/edgent-spi-topology
@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Policies.scheduleEvictOnFirstInsert(time, unit),
Policies.evictAllAndScheduleEvictWithProcess(time, unit),
(partition, tuple) -> {},
getKeyFunction(),
() -> new ArrayList<T>());
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
}
代码示例来源:origin: apache/incubator-edgent
@Override
public <J, U, K> TStream<J> joinLast(Function<T, K> keyer,
TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner) {
BiFunction<List<U>,K, Object> processor = Functions.synchronizedBiFunction((list, key) -> null);
Window<U, K, LinkedList<U>> window = Windows.lastNProcessOnInsert(1, lastStreamKeyer);
Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window, processor);
lastStream.pipe(op);
return this.map((tuple) -> {
Partition<U, K, ? extends List<U>> part = window.getPartitions().get(keyer.apply(tuple));
if(part == null)
return null;
J ret;
synchronized (part) {
U last = part.getContents().get(0);
ret = joiner.apply(tuple, last);
}
return ret;
});
}
代码示例来源:origin: apache/incubator-edgent
@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Policies.scheduleEvictOnFirstInsert(time, unit),
Policies.evictAllAndScheduleEvictWithProcess(time, unit),
(partition, tuple) -> {},
getKeyFunction(),
() -> new ArrayList<T>());
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
}
代码示例来源:origin: org.apache.edgent/edgent-spi-topology
@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Policies.doNothing(),
Policies.evictAll(),
Policies.processWhenFullAndEvict(size),
getKeyFunction(),
() -> new ArrayList<T>(size));
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
}
代码示例来源:origin: apache/incubator-edgent
@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Policies.doNothing(),
Policies.evictAll(),
Policies.processWhenFullAndEvict(size),
getKeyFunction(),
() -> new ArrayList<T>(size));
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void automaticMetricCleanup2() throws Exception {
// Declare topology with custom metric oplet
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
TStream<Integer> ints2 = ints.pipe(new TestOplet<Integer>());
ints2.pipe(new TestOplet<Integer>());
// Submit job
Future<? extends Job> fj = getSubmitter().submit(t);
Job job = fj.get();
Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
// Each test oplet registers two metrics
Map<String, Metric> all = metricRegistry.getMetrics();
assertEquals(4, all.size());
// After close all metrics have been unregistered
job.stateChange(Job.Action.CLOSE);
assertEquals(0, all.size());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void automaticMetricCleanup1() throws Exception {
// Declare topology with custom metric oplet
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
ints.pipe(new TestOplet<Integer>());
// Submit job
Future<? extends Job> fj = getSubmitter().submit(t);
Job job = fj.get();
Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
// At least one tuple was processed
int tupleCount = n.get();
assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0);
// Each test oplet registers two metrics
Map<String, Metric> all = metricRegistry.getMetrics();
assertEquals(2, all.size());
// After close all metrics have been unregistered
job.stateChange(Job.Action.CLOSE);
assertEquals(0, all.size());
}
代码示例来源:origin: apache/incubator-edgent
return feeder().pipe(op);
代码示例来源:origin: apache/incubator-edgent
@Test(expected = ExecutionException.class)
public void jobProcessSourceError() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.generate(() -> n.incrementAndGet());
ints.pipe(new FailedOplet<Integer>(12, 100));
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
try {
job.complete(10, TimeUnit.SECONDS);
} finally {
// RUNNING even though execution error
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.UNHEALTHY, job.getHealth());
assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
}
}
代码示例来源:origin: apache/incubator-edgent
@Test(expected = ExecutionException.class)
public void jobPeriodicSourceError() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
ints.pipe(new FailedOplet<Integer>(5, 0));
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
try {
job.complete(10, TimeUnit.SECONDS);
} finally {
// RUNNING even though execution error
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.UNHEALTHY, job.getHealth());
assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
}
}
内容来源于网络,如有侵权,请联系作者删除!