本文整理了Java中org.apache.edgent.topology.TStream.tag()
方法的一些代码示例,展示了TStream.tag()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.tag()
方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:tag
[英]Adds the specified tags to the stream. Adding the same tag to a stream multiple times will not change the result beyond the initial application.
[中]将指定的标记添加到流中。多次向流中添加同一标记不会改变初始应用程序之外的结果。
代码示例来源:origin: apache/incubator-edgent
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
代码示例来源:origin: org.apache.edgent/edgent-api-topology
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
代码示例来源:origin: apache/incubator-edgent
private TStream<JsonObject> allCommands() {
if (commandStream == null) {
String topicFilter = commandTopic(null);
commandStream = connector.subscribe(topicFilter, commandQoS,
(topic, payload) -> {
JsonObject jo = new JsonObject();
jo.addProperty(CMD_DEVICE, deviceId);
jo.addProperty(CMD_ID, extractCmd(topic));
jo.addProperty(CMD_TS, System.currentTimeMillis());
String fmt = extractCmdFmt(topic);
jo.addProperty(CMD_FORMAT, fmt);
if ("json".equals(fmt)) {
jo.add(CMD_PAYLOAD, JsonFunctions.fromBytes().apply(payload));
}
else {
jo.addProperty(CMD_PAYLOAD, new String(payload, StandardCharsets.UTF_8));
}
return jo;
})
.tag("allDeviceCmds");
}
return commandStream;
}
代码示例来源:origin: org.apache.edgent/edgent-api-topology
fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
int ch = 0;
for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
ch++;
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
代码示例来源:origin: apache/incubator-edgent
fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
int ch = 0;
for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
ch++;
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
代码示例来源:origin: org.apache.edgent/edgent-api-topology
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
final int finalCh = ch;
results.add(pipeline.apply(channels.get(ch), ch)
.tag("parallel-ch"+ch)
.peek(tuple -> splitter.channelDone(finalCh)));
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
代码示例来源:origin: apache/incubator-edgent
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
final int finalCh = ch;
results.add(pipeline.apply(channels.get(ch), ch)
.tag("parallel-ch"+ch)
.peek(tuple -> splitter.channelDone(finalCh)));
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
代码示例来源:origin: org.apache.edgent/edgent-connectors-iot
TStream<Date> hb = iotDevice.topology().poll(
() -> new Date(),
period, unit).tag("heartbeat");
j.addProperty("time", date.getTime());
return j;
}).tag("heartbeat");
Functions.unpartitioned(), 1).tag("pressureRelieved");
代码示例来源:origin: apache/incubator-edgent
TStream<Date> hb = iotDevice.topology().poll(
() -> new Date(),
period, unit).tag("heartbeat");
j.addProperty("time", date.getTime());
return j;
}).tag("heartbeat");
Functions.unpartitioned(), 1).tag("pressureRelieved");
代码示例来源:origin: apache/incubator-edgent
@SuppressWarnings("unused")
private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
return (stream,channel) -> stream
.map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
return jo; })
.map(fakeJsonAnalytic(channel, period, unit))
.filter(t->true)
.map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
return jo; })
.tag("pipeline-ch"+channel);
}
代码示例来源:origin: apache/incubator-edgent
private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
}
代码示例来源:origin: apache/incubator-edgent
TStream<String> s1 = gaussian.map(g -> "g1: " + g.toString()).tag("s1", "gaussian");
s1.sink(tuple -> {});
TStream<Double> sp0 = splits1.get(0).tag("split","sp0");
sp0 = Metrics.counter(sp0);
Metrics.rateMeter(sp0);
TStream<Double> sp0_1 = sp0.tag("split","sp0_1");
sp0_1.print();
TStream<Double> sp1 =splits1.get(1).tag("split","sp1");
sp1 = Metrics.counter(sp1);
sp1.print();
TStream<Double> sp2 =splits1.get(2).tag("split","sp2");
sp2 = Metrics.counter(sp2);
sp2.print();
TStream<Double> sp3 =splits1.get(3).tag("split","sp3");
sp3 = Metrics.counter(sp3);
sp3.print();
TStream<Double> sp4 =splits1.get(4).tag("split","sp4");
sp4 = Metrics.counter(sp4);
sp4.print();
TStream<Double> sp5 =splits1.get(5).tag("split","sp5");
sp5 = Metrics.counter(sp1);
sp5.print();
代码示例来源:origin: apache/incubator-edgent
private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
return (stream,channel) -> stream
.map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
.filter(t->true)
.tag("pipeline-ch"+channel);
}
代码示例来源:origin: apache/incubator-edgent
};
TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
代码示例来源:origin: apache/incubator-edgent
TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
TStream<Integer> result2 = result.map(jo -> {
int r = jo.get("result").getAsInt();
代码示例来源:origin: apache/incubator-edgent
TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
TStream<Integer> result2 = result.map(jo -> {
int r = jo.get("result").getAsInt();
代码示例来源:origin: apache/incubator-edgent
TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
TStream<Integer> result2 = result.map(jo -> {
int r = jo.get("result").getAsInt();
代码示例来源:origin: apache/incubator-edgent
@Test
public void testTag() throws Exception {
Topology t = newTopology();
List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
TStream<String> s = t.strings("a", "b");
assertEquals(0, s.getTags().size());
TStream<String> s2 = s.tag("tag1", "tag2");
assertSame(s, s2);
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
tags.add("tag3");
s.tag("tag3");
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
s.tag("tag3", "tag2", "tag1"); // ok to redundantly add
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
// test access at runtime
s2 = s.peek(tuple -> {
assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
}).filter(tuple -> true);
// just verify that tag presence doesn't otherwise muck up things
Condition<Long> tc = t.getTester().tupleCount(s2, 2);
Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
complete(t, tc);
assertTrue("contents "+contents.getResult(), contents.valid());
}
内容来源于网络,如有侵权,请联系作者删除!