本文整理了Java中org.apache.edgent.topology.TStream.modify()
方法的一些代码示例,展示了TStream.modify()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.modify()
方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:modify
[英]Declare a new stream that modifies each tuple from this stream into one (or zero) tuple of the same type T. For each tuple ton this stream, the returned stream will contain a tuple that is the result of modifier.apply(t) when the return is not null. The function may return the same reference as its input t or a different object of the same type. If modifier.apply(t) returns null then no tuple is submitted to the returned stream for t.
Example of modifying a stream String values by adding the suffix ' extra'.
TStream<String> strings = ...
TStream<String> modifiedStrings = strings.modify(t -> t.concat("extra"));
This method is equivalent to map(Function modifier).
[中]声明一个新流,该流将此流中的每个元组修改为一个(或零个)相同类型T的元组。对于此流中的每个元组,返回的流将包含一个作为修改器结果的元组。当返回值不为空时应用(t)。函数可以返回与其输入t相同的引用,也可以返回相同类型的不同对象。如果修改器。apply(t)返回null,则不会向t的返回流提交任何元组。
通过添加后缀“extra”修改流字符串值的示例。
TStream<String> strings = ...
TStream<String> modifiedStrings = strings.modify(t -> t.concat("extra"));
此方法相当于map(函数修饰符)。
代码示例来源:origin: apache/incubator-edgent
private void _testFanoutWithPeek(boolean after) throws Exception {
Topology t = newTopology();
Graph g = t.graph();
/* -- Filter -- Sink(.)
* /
* Source -- Peek -- FanOut ---- Modify -- Sink(@)
*
*/
TStream<Integer> d = integers(t, 1, 2, 3);
List<Integer> peekedValues = new ArrayList<>();
if (!after)
d.peek(tuple -> peekedValues.add(tuple));
TStream<Integer> df = d.filter(tuple -> tuple.intValue() > 0);
TStream<Integer> dm = d.modify(tuple -> new Integer(tuple.intValue() + 1));
if (after)
d.peek(tuple -> peekedValues.add(tuple));
df.sink(tuple -> System.out.print("."));
dm.sink(tuple -> System.out.print("@"));
assertEquals(7, g.getVertices().size());
assertEquals(6, g.getEdges().size());
// Insert counter metrics into all the streams
Metrics.counter(t);
printGraph(g);
assertEquals(10, g.getVertices().size());
assertEquals(9, g.getEdges().size());
}
代码示例来源:origin: apache/incubator-edgent
gaussian = gaussian.modify(g-> g*3 + 1);
mc1.modify(tuple -> new MyClass1(tuple.getS1() + "a1 b1 c1 d1 ", tuple.getS2() +" e1 f1 g1 h1 ", tuple.getD1() +1) );
mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mc2 = mc2.modify(
tuple -> new MyClass2(
new MyClass1(tuple.getMc1().getS1() + " c3 d3 e3 f3 ",
mc4 = mc4.modify(
tuple -> new MyClass2(
new MyClass1(tuple.getMc1().getS1() + " c41 d42 d43 f44 ",
mc5 = mc5.modify(
tuple -> new MyClass2(
new MyClass1(tuple.getMc1().getS1() + " c51 d52 e53 f54 ",
代码示例来源:origin: apache/incubator-edgent
@Test
public void testBlockingDelay() throws Exception {
// Timing variances on shared machines can cause this test to fail
assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
Topology topology = newTopology();
TStream<String> strings = topology.strings("a", "b", "c", "d");
TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
// delay stream
starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
// calculate delay
starts = starts.modify(v -> System.currentTimeMillis() - v);
starts = starts.filter(v -> v >= 300);
Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
complete(topology, tc);
assertTrue("valid:" + tc.getResult(), tc.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testFanout3() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "cde");
TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
Condition<Long> tstc = t.getTester().tupleCount(st, 3);
Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
complete(t, t.getTester().and(tsfc, tsmc, tstc));
assertTrue(tsf.valid());
assertTrue(tsm.valid());
// Can't use equals on byte[]
assertEquals(3, tst.getResult().size());
assertEquals("a", new String(tst.getResult().get(0)));
assertEquals("b", new String(tst.getResult().get(1)));
assertEquals("cde", new String(tst.getResult().get(2)));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testOneShotDelay() throws Exception {
Topology topology = newTopology();
TStream<String> strings = topology.strings("a", "b", "c", "d");
TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
// delay stream
starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
// calculate display
starts = starts.modify(v -> System.currentTimeMillis() - v);
// the first tuple shouldn't satisfy the predicate
starts = starts.filter(v -> v < 300);
Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
complete(topology, tc);
assertTrue("valid:" + tc.getResult(), tc.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testFanout2() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
complete(t, t.getTester().and(tsm, tsmc));
assertTrue(tsf.getResult().toString(), tsf.valid());
assertTrue(tsm.getResult().toString(), tsm.valid());
}
@Test
代码示例来源:origin: apache/incubator-edgent
@Test
public void testModify() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
TStream<String> i = s.modify(tuple -> tuple.concat("M"));
assertStream(t, i);
Condition<Long> tc = t.getTester().tupleCount(i, 3);
Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
complete(t, tc);
assertTrue(contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testModifyWithDrops() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("32", "423", "-746");
TStream<Integer> i = s.map(Integer::valueOf);
i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
assertStream(t, i);
Condition<Long> tc = t.getTester().tupleCount(i, 2);
Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
complete(t, tc);
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testUnion2() throws Exception {
Topology t = newTopology();
TStream<String> s1 = t.strings("a", "b", "c");
TStream<String> s2 = t.strings("d", "e");
TStream<String> su = s1.union(s2);
assertNotSame(s1, su);
assertNotSame(s2, su);
TStream<String> r = su.modify(v -> v.concat("X"));
Condition<Long> tc = t.getTester().tupleCount(r, 5);
Condition<List<String>> contents = t.getTester().contentsUnordered(r,
"aX", "bX", "cX", "dX", "eX");
complete(t, tc);
assertTrue(tc.getResult().toString(), tc.valid());
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testUnion4() throws Exception {
Topology t = newTopology();
TStream<String> s1 = t.strings("a", "b", "c");
TStream<String> s2 = t.strings("d", "e");
TStream<String> s3 = t.strings("f", "g", "h", "i");
TStream<String> s4 = t.strings("j");
TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
assertNotSame(s1, su);
assertNotSame(s2, su);
assertNotSame(s3, su);
assertNotSame(s4, su);
TStream<String> r = su.modify(v -> v.concat("Y"));
Condition<Long> tc = t.getTester().tupleCount(r, 10);
Condition<List<String>> contents = t.getTester().contentsUnordered(r,
"aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
complete(t, tc);
assertTrue(tc.getResult().toString(), tc.valid());
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testUnion4WithSelf() throws Exception {
Topology t = newTopology();
TStream<String> s1 = t.strings("a", "b", "c");
TStream<String> s2 = t.strings("d", "e");
TStream<String> s3 = t.strings("f", "g", "h", "i");
TStream<String> s4 = t.strings("j");
TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
assertNotSame(s1, su);
assertNotSame(s2, su);
assertNotSame(s3, su);
assertNotSame(s4, su);
TStream<String> r = su.modify(v -> v.concat("Y"));
Condition<Long> tc = t.getTester().tupleCount(r, 10);
Condition<List<String>> contents = t.getTester().contentsUnordered(r,
"aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
complete(t, tc);
assertTrue(tc.getResult().toString(), tc.valid());
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testDeadbandMaxSuppression() throws Exception {
Topology topology = newTopology("testDeadbandMaxSuppression");
TStream<Double> values = topology.of(12.9, 3.4, 12.3, 15.6, 18.4, -3.7, -4.5, 15.0, 16.0, 30.0, 42.0 );
// 18.4 will be included as it is delayed since the last inband value.
values = values.modify(tuple -> {if (tuple == 18.4)
try {
Thread.sleep(5000);
} catch (Exception e) {
throw new RuntimeException(e);
} return tuple;});
TStream<Double> filtered = Filters.deadband(values, identity(),
v -> v >= 10.0 && v <= 30.0, 3, TimeUnit.SECONDS);
Condition<Long> count = topology.getTester().tupleCount(filtered, 8);
Condition<List<Double>> contents = topology.getTester().streamContents(filtered, 12.9, 3.4, 12.3, 18.4, -3.7, -4.5, 15.0, 42.0 );
complete(topology, count);
assertTrue(count.valid());
assertTrue(contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
代码示例来源:origin: apache/incubator-edgent
s.peek(tuple -> tuple.peekedCnt++);
TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));
内容来源于网络,如有侵权,请联系作者删除!