org.apache.edgent.topology.TStream.tag()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(62)

本文整理了Java中org.apache.edgent.topology.TStream.tag()方法的一些代码示例,展示了TStream.tag()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.tag()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:tag

TStream.tag介绍

[英]Adds the specified tags to the stream. Adding the same tag to a stream multiple times will not change the result beyond the initial application.
[中]将指定的标记添加到流中。多次向流中添加同一标记不会改变初始应用程序之外的结果。

代码示例

代码示例来源:origin: apache/incubator-edgent

channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
 channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
 results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");

代码示例来源:origin: org.apache.edgent/edgent-api-topology

channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
 channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
 results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");

代码示例来源:origin: apache/incubator-edgent

private TStream<JsonObject> allCommands() {
  if (commandStream == null) {
    String topicFilter = commandTopic(null);
    commandStream = connector.subscribe(topicFilter, commandQoS,
        (topic, payload) -> {
          JsonObject jo = new JsonObject();
          jo.addProperty(CMD_DEVICE, deviceId);
          jo.addProperty(CMD_ID, extractCmd(topic));
          jo.addProperty(CMD_TS, System.currentTimeMillis());
          String fmt = extractCmdFmt(topic);
          jo.addProperty(CMD_FORMAT, fmt);
          if ("json".equals(fmt)) {
            jo.add(CMD_PAYLOAD, JsonFunctions.fromBytes().apply(payload));
          }
          else {
            jo.addProperty(CMD_PAYLOAD, new String(payload, StandardCharsets.UTF_8));
          }
          return jo;
        })
        .tag("allDeviceCmds");
  }
  return commandStream;
}

代码示例来源:origin: org.apache.edgent/edgent-api-topology

fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
int ch = 0;
for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
 results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
 ch++;
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");

代码示例来源:origin: apache/incubator-edgent

fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
int ch = 0;
for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
 results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
 ch++;
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");

代码示例来源:origin: org.apache.edgent/edgent-api-topology

channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
 channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
 final int finalCh = ch;
 results.add(pipeline.apply(channels.get(ch), ch)
   .tag("parallel-ch"+ch)
   .peek(tuple -> splitter.channelDone(finalCh)));
TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");

代码示例来源:origin: apache/incubator-edgent

channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
 channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
 final int finalCh = ch;
 results.add(pipeline.apply(channels.get(ch), ch)
   .tag("parallel-ch"+ch)
   .peek(tuple -> splitter.channelDone(finalCh)));
TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");

代码示例来源:origin: org.apache.edgent/edgent-connectors-iot

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");
  j.addProperty("time", date.getTime());
  return j;
}).tag("heartbeat");
      Functions.unpartitioned(), 1).tag("pressureRelieved");

代码示例来源:origin: apache/incubator-edgent

TStream<Date> hb = iotDevice.topology().poll(
  () -> new Date(),
  period, unit).tag("heartbeat");
  j.addProperty("time", date.getTime());
  return j;
}).tag("heartbeat");
      Functions.unpartitioned(), 1).tag("pressureRelieved");

代码示例来源:origin: apache/incubator-edgent

@SuppressWarnings("unused")
private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
          return jo; })
   .map(fakeJsonAnalytic(channel, period, unit))
   .filter(t->true)
   .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
         return jo; })
   .tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
 return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

TStream<String> s1 = gaussian.map(g -> "g1: " + g.toString()).tag("s1", "gaussian");
s1.sink(tuple -> {});
TStream<Double> sp0 = splits1.get(0).tag("split","sp0");
sp0 = Metrics.counter(sp0);
Metrics.rateMeter(sp0);
TStream<Double> sp0_1 = sp0.tag("split","sp0_1");
sp0_1.print();
TStream<Double> sp1 =splits1.get(1).tag("split","sp1");
sp1 = Metrics.counter(sp1);
sp1.print();
TStream<Double> sp2 =splits1.get(2).tag("split","sp2");
sp2 = Metrics.counter(sp2);
sp2.print();
TStream<Double> sp3 =splits1.get(3).tag("split","sp3");
sp3 = Metrics.counter(sp3);
sp3.print();
TStream<Double> sp4 =splits1.get(4).tag("split","sp4");
sp4 = Metrics.counter(sp4);
sp4.print();
TStream<Double> sp5 =splits1.get(5).tag("split","sp5");
sp5 = Metrics.counter(sp1);
sp5.print();

代码示例来源:origin: apache/incubator-edgent

private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
   .filter(t->true)
   .tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

};
TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");

代码示例来源:origin: apache/incubator-edgent

TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
TStream<Integer> result2 = result.map(jo -> {
  int r = jo.get("result").getAsInt();

代码示例来源:origin: apache/incubator-edgent

TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
TStream<Integer> result2 = result.map(jo -> {
  int r = jo.get("result").getAsInt();

代码示例来源:origin: apache/incubator-edgent

TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
TStream<Integer> result2 = result.map(jo -> {
  int r = jo.get("result").getAsInt();

代码示例来源:origin: apache/incubator-edgent

@Test
public void testTag() throws Exception {
  Topology t = newTopology();
  List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
  
  TStream<String> s = t.strings("a", "b");
  assertEquals(0, s.getTags().size());
  
  TStream<String> s2 = s.tag("tag1", "tag2");
  assertSame(s, s2);
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  
  tags.add("tag3");
  s.tag("tag3");
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  
  s.tag("tag3", "tag2", "tag1");  // ok to redundantly add
  assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  // test access at runtime
  s2 = s.peek(tuple -> {
    assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
  }).filter(tuple -> true);
  // just verify that tag presence doesn't otherwise muck up things
  Condition<Long> tc = t.getTester().tupleCount(s2, 2);
  Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
  complete(t, tc);
  assertTrue("contents "+contents.getResult(), contents.valid());
}

相关文章