org.apache.edgent.topology.TStream.modify()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(81)

本文整理了Java中org.apache.edgent.topology.TStream.modify()方法的一些代码示例,展示了TStream.modify()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.modify()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:modify

TStream.modify介绍

[英]Declare a new stream that modifies each tuple from this stream into one (or zero) tuple of the same type T. For each tuple ton this stream, the returned stream will contain a tuple that is the result of modifier.apply(t) when the return is not null. The function may return the same reference as its input t or a different object of the same type. If modifier.apply(t) returns null then no tuple is submitted to the returned stream for t.

Example of modifying a stream String values by adding the suffix ' extra'.

TStream<String> strings = ... 
TStream<String> modifiedStrings = strings.modify(t -> t.concat("extra"));

This method is equivalent to map(Function modifier).
[中]声明一个新流,该流将此流中的每个元组修改为一个(或零个)相同类型T的元组。对于此流中的每个元组,返回的流将包含一个作为修改器结果的元组。当返回值不为空时应用(t)。函数可以返回与其输入t相同的引用,也可以返回相同类型的不同对象。如果修改器。apply(t)返回null,则不会向t的返回流提交任何元组。
通过添加后缀“extra”修改流字符串值的示例。

TStream<String> strings = ... 
TStream<String> modifiedStrings = strings.modify(t -> t.concat("extra"));

此方法相当于map(函数修饰符)。

代码示例

代码示例来源:origin: apache/incubator-edgent

private void _testFanoutWithPeek(boolean after) throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  /*                            -- Filter -- Sink(.)
   *                           / 
   * Source -- Peek -- FanOut ---- Modify -- Sink(@)
   * 
   */
  TStream<Integer> d = integers(t, 1, 2, 3);
  List<Integer> peekedValues = new ArrayList<>();
  
  if (!after)
    d.peek(tuple -> peekedValues.add(tuple));
  TStream<Integer> df = d.filter(tuple -> tuple.intValue() > 0);
  TStream<Integer> dm = d.modify(tuple -> new Integer(tuple.intValue() + 1));
  if (after)
    d.peek(tuple -> peekedValues.add(tuple));
  df.sink(tuple -> System.out.print("."));
  dm.sink(tuple -> System.out.print("@"));
  
  assertEquals(7, g.getVertices().size());
  assertEquals(6, g.getEdges().size());
  // Insert counter metrics into all the streams 
  Metrics.counter(t);
  printGraph(g);
  assertEquals(10, g.getVertices().size());
  assertEquals(9, g.getEdges().size());
}

代码示例来源:origin: apache/incubator-edgent

gaussian = gaussian.modify(g-> g*3 + 1);
mc1.modify(tuple -> new MyClass1(tuple.getS1() + "a1 b1 c1 d1 ", tuple.getS2() +" e1 f1 g1 h1 ", tuple.getD1() +1) );
mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mc2 = mc2.modify(
    tuple -> new MyClass2(
          new MyClass1(tuple.getMc1().getS1() + " c3 d3 e3 f3 ", 
mc4 = mc4.modify(
    tuple -> new MyClass2(
        new MyClass1(tuple.getMc1().getS1() + " c41 d42 d43 f44 ", 
mc5 = mc5.modify(
    tuple -> new MyClass2(
        new MyClass1(tuple.getMc1().getS1() + " c51 d52 e53 f54 ",

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBlockingDelay() throws Exception {
  // Timing variances on shared machines can cause this test to fail
  assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
  Topology topology = newTopology();
  
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  
  TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
  
  // delay stream
  starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
  
  // calculate delay
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  
  starts = starts.filter(v -> v >= 300);
  
  Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testFanout3() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "cde");
  TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
  TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
  TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
  Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
  Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
  Condition<Long> tstc = t.getTester().tupleCount(st, 3);
  Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
  Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
  Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
  complete(t, t.getTester().and(tsfc, tsmc, tstc));
  assertTrue(tsf.valid());
  assertTrue(tsm.valid());
  // Can't use equals on byte[]
  assertEquals(3, tst.getResult().size());
  assertEquals("a", new String(tst.getResult().get(0)));
  assertEquals("b", new String(tst.getResult().get(1)));
  assertEquals("cde", new String(tst.getResult().get(2)));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testOneShotDelay() throws Exception {
  Topology topology = newTopology();
  
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  
  TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
  
  // delay stream
  starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
  
  // calculate display
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  
  // the first tuple shouldn't satisfy the predicate
  starts = starts.filter(v -> v < 300);
  
  Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testFanout2() throws Exception {
  Topology t = newTopology();
  
  TStream<String> s = t.strings("a", "b", "c");
  TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
  TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
  Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
  Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
  Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
  complete(t, t.getTester().and(tsm, tsmc));
  assertTrue(tsf.getResult().toString(), tsf.valid());
  assertTrue(tsm.getResult().toString(), tsm.valid());
}
@Test

代码示例来源:origin: apache/incubator-edgent

@Test
public void testModify() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  TStream<String> i = s.modify(tuple -> tuple.concat("M"));
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 3);
  Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
  complete(t, tc);
  assertTrue(contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testModifyWithDrops() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("32", "423", "-746");
  TStream<Integer> i = s.map(Integer::valueOf);
  i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 2);
  Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
  complete(t, tc);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testUnion2() throws Exception {
  Topology t = newTopology();
  TStream<String> s1 = t.strings("a", "b", "c");
  TStream<String> s2 = t.strings("d", "e");
  TStream<String> su = s1.union(s2);
  assertNotSame(s1, su);
  assertNotSame(s2, su);
  TStream<String> r = su.modify(v -> v.concat("X"));
  Condition<Long> tc = t.getTester().tupleCount(r, 5);
  Condition<List<String>> contents = t.getTester().contentsUnordered(r,
      "aX", "bX", "cX", "dX", "eX");
  complete(t, tc);
  assertTrue(tc.getResult().toString(), tc.valid());
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testUnion4() throws Exception {
  Topology t = newTopology();
  TStream<String> s1 = t.strings("a", "b", "c");
  TStream<String> s2 = t.strings("d", "e");
  TStream<String> s3 = t.strings("f", "g", "h", "i");
  TStream<String> s4 = t.strings("j");
  TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
  assertNotSame(s1, su);
  assertNotSame(s2, su);
  assertNotSame(s3, su);
  assertNotSame(s4, su);
  TStream<String> r = su.modify(v -> v.concat("Y"));
  Condition<Long> tc = t.getTester().tupleCount(r, 10);
  Condition<List<String>> contents = t.getTester().contentsUnordered(r,
      "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
  complete(t, tc);
  assertTrue(tc.getResult().toString(), tc.valid());
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testUnion4WithSelf() throws Exception {
  Topology t = newTopology();
  TStream<String> s1 = t.strings("a", "b", "c");
  TStream<String> s2 = t.strings("d", "e");
  TStream<String> s3 = t.strings("f", "g", "h", "i");
  TStream<String> s4 = t.strings("j");
  TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
  assertNotSame(s1, su);
  assertNotSame(s2, su);
  assertNotSame(s3, su);
  assertNotSame(s4, su);
  TStream<String> r = su.modify(v -> v.concat("Y"));
  Condition<Long> tc = t.getTester().tupleCount(r, 10);
  Condition<List<String>> contents = t.getTester().contentsUnordered(r,
      "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
  complete(t, tc);
  assertTrue(tc.getResult().toString(), tc.valid());
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testDeadbandMaxSuppression() throws Exception {
  Topology topology = newTopology("testDeadbandMaxSuppression");
  
  TStream<Double> values = topology.of(12.9, 3.4, 12.3, 15.6, 18.4, -3.7, -4.5, 15.0, 16.0, 30.0, 42.0 );
  
  // 18.4 will be included as it is delayed since the last inband value.
  values = values.modify(tuple -> {if (tuple == 18.4)
    try {
      Thread.sleep(5000);
    } catch (Exception e) {
      throw new RuntimeException(e);
    } return tuple;});
  
  TStream<Double> filtered = Filters.deadband(values, identity(),
      v -> v >= 10.0 && v <= 30.0, 3, TimeUnit.SECONDS);
  
  Condition<Long> count = topology.getTester().tupleCount(filtered, 8);
  Condition<List<Double>> contents = topology.getTester().streamContents(filtered, 12.9, 3.4, 12.3, 18.4, -3.7, -4.5, 15.0, 42.0 );
  complete(topology, count);
  assertTrue(count.valid());
  assertTrue(contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));

代码示例来源:origin: apache/incubator-edgent

s.peek(tuple -> tuple.peekedCnt++);
TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));

相关文章