org.apache.edgent.topology.TStream.peek()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(62)

本文整理了Java中org.apache.edgent.topology.TStream.peek()方法的一些代码示例,展示了TStream.peek()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.peek()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:peek

TStream.peek介绍

[英]Declare a stream that contains the same contents as this stream while peeking at each element using peeker.
For each tuple t on this stream, peeker.accept(t) will be called.
[中]声明一个流,该流包含与此流相同的内容,同时使用Peek查看每个元素。
对于流中的每个元组t,Peek。接受(t)将被调用。

代码示例

代码示例来源:origin: org.apache.edgent/edgent-api-topology

results.add(pipeline.apply(channels.get(ch), ch)
  .tag("parallel-ch"+ch)
  .peek(tuple -> splitter.channelDone(finalCh)));

代码示例来源:origin: apache/incubator-edgent

results.add(pipeline.apply(channels.get(ch), ch)
  .tag("parallel-ch"+ch)
  .peek(tuple -> splitter.channelDone(finalCh)));

代码示例来源:origin: apache/incubator-edgent

@Test
public void metricsEverywhereMultiplePeek() throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> peekedValues = new ArrayList<>();
  TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
  TStream<String> speek2 = speek.peek(tuple -> peekedValues.add(tuple + "2nd"));
  TStream<String> speek3 = speek2.peek(tuple -> peekedValues.add(tuple + "3rd"));
  speek3.sink(tuple -> System.out.print("."));
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  assertEquals(5, vertices.size());
  Collection<Edge> edges = g.getEdges();
  assertEquals(4, edges.size());
  Metrics.counter(t);
  printGraph(g);
  // One single counter inserted after the 3rd peek 
  vertices = g.getVertices();
  assertEquals(6, vertices.size());
  edges = g.getEdges();
  assertEquals(5, edges.size());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testMultiplePeek() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> peekedValues = new ArrayList<>();
  TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
  assertSame(s, speek);
  TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
  assertSame(s, speek2);
  TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
  assertSame(s, speek3);
  Condition<Long> tc = t.getTester().tupleCount(s, 3);
  Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
  complete(t, tc);
  assertTrue(contents.valid());
  List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
  assertEquals(expected, peekedValues);
}

代码示例来源:origin: apache/incubator-edgent

s.peek(tuple -> cnt.incrementAndGet());

代码示例来源:origin: apache/incubator-edgent

private void _testFanoutWithPeek(boolean after) throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  /*                            -- Filter -- Sink(.)
   *                           / 
   * Source -- Peek -- FanOut ---- Modify -- Sink(@)
   * 
   */
  TStream<Integer> d = integers(t, 1, 2, 3);
  List<Integer> peekedValues = new ArrayList<>();
  
  if (!after)
    d.peek(tuple -> peekedValues.add(tuple));
  TStream<Integer> df = d.filter(tuple -> tuple.intValue() > 0);
  TStream<Integer> dm = d.modify(tuple -> new Integer(tuple.intValue() + 1));
  if (after)
    d.peek(tuple -> peekedValues.add(tuple));
  df.sink(tuple -> System.out.print("."));
  dm.sink(tuple -> System.out.print("@"));
  
  assertEquals(7, g.getVertices().size());
  assertEquals(6, g.getEdges().size());
  // Insert counter metrics into all the streams 
  Metrics.counter(t);
  printGraph(g);
  assertEquals(10, g.getVertices().size());
  assertEquals(9, g.getEdges().size());
}

代码示例来源:origin: apache/incubator-edgent

gaussian = gaussian.peek(g -> System.out.println("R1:" + g));
TStream<Double>  filter9 = gaussian.filter(tuple -> tuple.toString().charAt(0) == '9' ).tag("split9");
filter0 = filter0.peek(g -> System.out.println("filter0 : " + g));
filter1 = filter1.peek(g -> System.out.println("filter1 : " + g));
filter2 = filter2.peek(g -> System.out.println("filter2 : " + g));
filter3 = filter3.peek(g -> System.out.println("filter3 : " + g));
filter4 = filter4.peek(g -> System.out.println("filter4 : " + g));
filter5 = filter5.peek(g -> System.out.println("filter5 : " + g));
filter6 = filter6.peek(g -> System.out.println("filter6 : " + g));
filter7 = filter7.peek(g -> System.out.println("filter7 : " + g));
filter8 = filter8.peek(g -> System.out.println("filter8 : " + g));
filter9 = filter9.peek(g -> System.out.println("filter9 : " + g));
filter10 = filter10.peek(g -> System.out.println("filter10 : " + g));
mc1.peek(g -> System.out.print(g.toString()));
mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mcs1.peek(tuple -> System.out.println(" mcs1_source2: " + tuple.toString()));
mc2 = mc2.peek(tuple -> System.out.println("MyClass2_source3:" + tuple.toString()));
mc4 = mc4.peek(tuple -> System.out.println("MyClass2_source4:" + tuple.toString()));
mc5 = mc5.peek(tuple -> System.out.println("MyClass2_source5:" + tuple.toString()));

代码示例来源:origin: apache/incubator-edgent

.peek(tuple -> System.out.println(new Date() + " watcher added "+tuple))
 .peek(tuple -> { if (new File(tuple).getName().startsWith("."))
  throw new RuntimeException("Not filtering active/hidden files "+tuple); });
TStream<String> readContents = FileStreams.textFileReader(pathnames);

代码示例来源:origin: apache/incubator-edgent

/**
 * Test Peek. This will only work with an embedded setup.
 * 
 * @throws Exception on failure
 */
@Test
public void metricsEverywherePeek() throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> peekedValues = new ArrayList<>();
  TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
  speek.sink(tuple -> System.out.print("."));
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  assertEquals(3, vertices.size());
  Collection<Edge> edges = g.getEdges();
  assertEquals(2, edges.size());
  Metrics.counter(t);
  printGraph(g);
  // One single counter inserted after the peek 
  vertices = g.getVertices();
  assertEquals(4, vertices.size());
  edges = g.getEdges();
  assertEquals(3, edges.size());
}

代码示例来源:origin: apache/incubator-edgent

.peek(tuple -> latch.countDown())
.map(tup -> new String(tup, StandardCharsets.UTF_8));

代码示例来源:origin: apache/incubator-edgent

.peek(tuple -> latch.countDown());

代码示例来源:origin: apache/incubator-edgent

.peek(tuple -> latch.countDown());

代码示例来源:origin: apache/incubator-edgent

@Test
public void testValveInitiallyClosed() throws Exception {
  Topology top = newTopology("testValve");
  
  TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  
  Valve<Integer> valve = new Valve<>(false);
  
  AtomicInteger cnt = new AtomicInteger();
  TStream<Integer> filtered = values
                .peek(tuple -> {
                  // reject all but 4,5,6
                  int curCnt = cnt.incrementAndGet();
                  if (curCnt > 6)
                    valve.setOpen(false);
                  else if (curCnt > 3)
                    valve.setOpen(true);
                  })
                .filter(valve);
  Condition<Long> count = top.getTester().tupleCount(filtered, 3);
  Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
  complete(top, count);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testValveInitiallyOpen() throws Exception {
  Topology top = newTopology("testValve");
  TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  
  Valve<Integer> valve = new Valve<>();
  AtomicInteger cnt = new AtomicInteger();
  TStream<Integer> filtered = values
                .peek(tuple -> {
                  // reject 4,5,6
                  int curCnt = cnt.incrementAndGet();
                  if (curCnt > 6)
                    valve.setOpen(true);
                  else if (curCnt > 3)
                    valve.setOpen(false);
                  })
                .filter(valve);
  Condition<Long> count = top.getTester().tupleCount(filtered, 7);
  Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
  complete(top, count);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

if (numBefore > 0) {
 for (int i = 0; i < numBefore; i++)
  s.peek(tuple -> tuple.peekedCnt++);
if (numMiddle > 0) {
 for (int i = 0; i < numMiddle; i++)
  s.peek(tuple -> tuple.peekedCnt++);
  s.peek(tuple -> tuple.peekedCnt++);

代码示例来源:origin: apache/incubator-edgent

/**
 * Test Peek. This will only work with an embedded setup.
 * 
 * @throws Exception on failure
 */
@Test
public void testPeek() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> peekedValues = new ArrayList<>();
  TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
  assertSame(s, speek);
  Condition<Long> tc = t.getTester().tupleCount(s, 3);
  Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
  complete(t, tc);
  assertTrue(contents.valid());
  assertEquals(contents.getResult(), peekedValues);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testTag() throws Exception {
  Topology t = newTopology();
  List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
  
  TStream<String> s = t.strings("a", "b");
  assertEquals(0, s.getTags().size());
  
  TStream<String> s2 = s.tag("tag1", "tag2");
  assertSame(s, s2);
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  
  tags.add("tag3");
  s.tag("tag3");
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  
  s.tag("tag3", "tag2", "tag1");  // ok to redundantly add
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  // test access at runtime
  s2 = s.peek(tuple -> {
    assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  }).filter(tuple -> true);
  // just verify that tag presence doesn't otherwise muck up things
  Condition<Long> tc = t.getTester().tupleCount(s2, 2);
  Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
  complete(t, tc);
  assertTrue("contents "+contents.getResult(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testAlias() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b");
  assertEquals(null, s.getAlias());
  
  TStream<String> s2 = s.alias("sAlias");
  assertSame(s, s2);
  assertEquals("sAlias", s.getAlias());
  
  try {
    s.alias("another");  // expect ISE - alias already set
    assertTrue(false);
  } catch (IllegalStateException e) {
    ; // expected
  }
  
  // test access at runtime
  s2 = s.peek(tuple -> {
    assertEquals("sAlias", s.getAlias());
  }).filter(tuple -> true);
  // just verify that alias presence doesn't otherwise muck up things
  Condition<Long> tc = t.getTester().tupleCount(s2, 2);
  Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
  complete(t, tc);
  assertTrue("contents "+contents.getResult(), contents.valid());
}

相关文章