本文整理了Java中org.apache.edgent.topology.TStream.peek()
方法的一些代码示例,展示了TStream.peek()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.peek()
方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:peek
[英]Declare a stream that contains the same contents as this stream while peeking at each element using peeker.
For each tuple t on this stream, peeker.accept(t) will be called.
[中]声明一个流,该流包含与此流相同的内容,同时使用Peek查看每个元素。
对于流中的每个元组t,Peek。接受(t)将被调用。
代码示例来源:origin: org.apache.edgent/edgent-api-topology
results.add(pipeline.apply(channels.get(ch), ch)
.tag("parallel-ch"+ch)
.peek(tuple -> splitter.channelDone(finalCh)));
代码示例来源:origin: apache/incubator-edgent
results.add(pipeline.apply(channels.get(ch), ch)
.tag("parallel-ch"+ch)
.peek(tuple -> splitter.channelDone(finalCh)));
代码示例来源:origin: apache/incubator-edgent
@Test
public void metricsEverywhereMultiplePeek() throws Exception {
Topology t = newTopology();
Graph g = t.graph();
TStream<String> s = t.strings("a", "b", "c");
List<String> peekedValues = new ArrayList<>();
TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
TStream<String> speek2 = speek.peek(tuple -> peekedValues.add(tuple + "2nd"));
TStream<String> speek3 = speek2.peek(tuple -> peekedValues.add(tuple + "3rd"));
speek3.sink(tuple -> System.out.print("."));
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
assertEquals(5, vertices.size());
Collection<Edge> edges = g.getEdges();
assertEquals(4, edges.size());
Metrics.counter(t);
printGraph(g);
// One single counter inserted after the 3rd peek
vertices = g.getVertices();
assertEquals(6, vertices.size());
edges = g.getEdges();
assertEquals(5, edges.size());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testMultiplePeek() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
List<String> peekedValues = new ArrayList<>();
TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
assertSame(s, speek);
TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
assertSame(s, speek2);
TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
assertSame(s, speek3);
Condition<Long> tc = t.getTester().tupleCount(s, 3);
Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
complete(t, tc);
assertTrue(contents.valid());
List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
assertEquals(expected, peekedValues);
}
代码示例来源:origin: apache/incubator-edgent
s.peek(tuple -> cnt.incrementAndGet());
代码示例来源:origin: apache/incubator-edgent
private void _testFanoutWithPeek(boolean after) throws Exception {
Topology t = newTopology();
Graph g = t.graph();
/* -- Filter -- Sink(.)
* /
* Source -- Peek -- FanOut ---- Modify -- Sink(@)
*
*/
TStream<Integer> d = integers(t, 1, 2, 3);
List<Integer> peekedValues = new ArrayList<>();
if (!after)
d.peek(tuple -> peekedValues.add(tuple));
TStream<Integer> df = d.filter(tuple -> tuple.intValue() > 0);
TStream<Integer> dm = d.modify(tuple -> new Integer(tuple.intValue() + 1));
if (after)
d.peek(tuple -> peekedValues.add(tuple));
df.sink(tuple -> System.out.print("."));
dm.sink(tuple -> System.out.print("@"));
assertEquals(7, g.getVertices().size());
assertEquals(6, g.getEdges().size());
// Insert counter metrics into all the streams
Metrics.counter(t);
printGraph(g);
assertEquals(10, g.getVertices().size());
assertEquals(9, g.getEdges().size());
}
代码示例来源:origin: apache/incubator-edgent
gaussian = gaussian.peek(g -> System.out.println("R1:" + g));
TStream<Double> filter9 = gaussian.filter(tuple -> tuple.toString().charAt(0) == '9' ).tag("split9");
filter0 = filter0.peek(g -> System.out.println("filter0 : " + g));
filter1 = filter1.peek(g -> System.out.println("filter1 : " + g));
filter2 = filter2.peek(g -> System.out.println("filter2 : " + g));
filter3 = filter3.peek(g -> System.out.println("filter3 : " + g));
filter4 = filter4.peek(g -> System.out.println("filter4 : " + g));
filter5 = filter5.peek(g -> System.out.println("filter5 : " + g));
filter6 = filter6.peek(g -> System.out.println("filter6 : " + g));
filter7 = filter7.peek(g -> System.out.println("filter7 : " + g));
filter8 = filter8.peek(g -> System.out.println("filter8 : " + g));
filter9 = filter9.peek(g -> System.out.println("filter9 : " + g));
filter10 = filter10.peek(g -> System.out.println("filter10 : " + g));
mc1.peek(g -> System.out.print(g.toString()));
mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mcs1.peek(tuple -> System.out.println(" mcs1_source2: " + tuple.toString()));
mc2 = mc2.peek(tuple -> System.out.println("MyClass2_source3:" + tuple.toString()));
mc4 = mc4.peek(tuple -> System.out.println("MyClass2_source4:" + tuple.toString()));
mc5 = mc5.peek(tuple -> System.out.println("MyClass2_source5:" + tuple.toString()));
代码示例来源:origin: apache/incubator-edgent
.peek(tuple -> System.out.println(new Date() + " watcher added "+tuple))
.peek(tuple -> { if (new File(tuple).getName().startsWith("."))
throw new RuntimeException("Not filtering active/hidden files "+tuple); });
TStream<String> readContents = FileStreams.textFileReader(pathnames);
代码示例来源:origin: apache/incubator-edgent
/**
* Test Peek. This will only work with an embedded setup.
*
* @throws Exception on failure
*/
@Test
public void metricsEverywherePeek() throws Exception {
Topology t = newTopology();
Graph g = t.graph();
TStream<String> s = t.strings("a", "b", "c");
List<String> peekedValues = new ArrayList<>();
TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
speek.sink(tuple -> System.out.print("."));
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
assertEquals(3, vertices.size());
Collection<Edge> edges = g.getEdges();
assertEquals(2, edges.size());
Metrics.counter(t);
printGraph(g);
// One single counter inserted after the peek
vertices = g.getVertices();
assertEquals(4, vertices.size());
edges = g.getEdges();
assertEquals(3, edges.size());
}
代码示例来源:origin: apache/incubator-edgent
.peek(tuple -> latch.countDown())
.map(tup -> new String(tup, StandardCharsets.UTF_8));
代码示例来源:origin: apache/incubator-edgent
.peek(tuple -> latch.countDown());
代码示例来源:origin: apache/incubator-edgent
.peek(tuple -> latch.countDown());
代码示例来源:origin: apache/incubator-edgent
@Test
public void testValveInitiallyClosed() throws Exception {
Topology top = newTopology("testValve");
TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Valve<Integer> valve = new Valve<>(false);
AtomicInteger cnt = new AtomicInteger();
TStream<Integer> filtered = values
.peek(tuple -> {
// reject all but 4,5,6
int curCnt = cnt.incrementAndGet();
if (curCnt > 6)
valve.setOpen(false);
else if (curCnt > 3)
valve.setOpen(true);
})
.filter(valve);
Condition<Long> count = top.getTester().tupleCount(filtered, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
complete(top, count);
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testValveInitiallyOpen() throws Exception {
Topology top = newTopology("testValve");
TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Valve<Integer> valve = new Valve<>();
AtomicInteger cnt = new AtomicInteger();
TStream<Integer> filtered = values
.peek(tuple -> {
// reject 4,5,6
int curCnt = cnt.incrementAndGet();
if (curCnt > 6)
valve.setOpen(true);
else if (curCnt > 3)
valve.setOpen(false);
})
.filter(valve);
Condition<Long> count = top.getTester().tupleCount(filtered, 7);
Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
complete(top, count);
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
if (numBefore > 0) {
for (int i = 0; i < numBefore; i++)
s.peek(tuple -> tuple.peekedCnt++);
if (numMiddle > 0) {
for (int i = 0; i < numMiddle; i++)
s.peek(tuple -> tuple.peekedCnt++);
s.peek(tuple -> tuple.peekedCnt++);
代码示例来源:origin: apache/incubator-edgent
/**
* Test Peek. This will only work with an embedded setup.
*
* @throws Exception on failure
*/
@Test
public void testPeek() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
List<String> peekedValues = new ArrayList<>();
TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
assertSame(s, speek);
Condition<Long> tc = t.getTester().tupleCount(s, 3);
Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
complete(t, tc);
assertTrue(contents.valid());
assertEquals(contents.getResult(), peekedValues);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testTag() throws Exception {
Topology t = newTopology();
List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
TStream<String> s = t.strings("a", "b");
assertEquals(0, s.getTags().size());
TStream<String> s2 = s.tag("tag1", "tag2");
assertSame(s, s2);
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
tags.add("tag3");
s.tag("tag3");
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
s.tag("tag3", "tag2", "tag1"); // ok to redundantly add
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
// test access at runtime
s2 = s.peek(tuple -> {
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
}).filter(tuple -> true);
// just verify that tag presence doesn't otherwise muck up things
Condition<Long> tc = t.getTester().tupleCount(s2, 2);
Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
complete(t, tc);
assertTrue("contents "+contents.getResult(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testAlias() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b");
assertEquals(null, s.getAlias());
TStream<String> s2 = s.alias("sAlias");
assertSame(s, s2);
assertEquals("sAlias", s.getAlias());
try {
s.alias("another"); // expect ISE - alias already set
assertTrue(false);
} catch (IllegalStateException e) {
; // expected
}
// test access at runtime
s2 = s.peek(tuple -> {
assertEquals("sAlias", s.getAlias());
}).filter(tuple -> true);
// just verify that alias presence doesn't otherwise muck up things
Condition<Long> tc = t.getTester().tupleCount(s2, 2);
Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
complete(t, tc);
assertTrue("contents "+contents.getResult(), contents.valid());
}
内容来源于网络,如有侵权,请联系作者删除!