org.apache.edgent.topology.TStream.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(74)

本文整理了Java中org.apache.edgent.topology.TStream.flatMap()方法的一些代码示例,展示了TStream.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.flatMap()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:flatMap

TStream.flatMap介绍

[英]Declare a new stream that maps tuples from this stream into one or more (or zero) tuples of a different type U. For each tuple t on this stream, the returned stream will contain all non-null tuples in the Iterator that is the result of mapper.apply(t). Tuples will be added to the returned stream in the order the iterator returns them.
If the return is null or an empty iterator then no tuples are added to the returned stream for input tuple t.

Examples of mapping a stream containing lines of text into a stream of words split out from each line. The order of the words in the stream will match the order of the words in the lines.

TStream<String> lines = ... 
TStream<String> words = lines.flatMap( 
line -> Arrays.asList(line.split(" ")));

[中]声明一个新流,该流将元组从该流映射到一个或多个(或零个)不同类型U的元组。对于该流上的每个元组t,返回的流将在迭代器中包含所有非空元组,这是mapper的结果。应用(t)。元组将按迭代器返回的顺序添加到返回的流中。
如果返回值为null或为空迭代器,则不会向返回流中添加输入元组t的元组。
将包含文本行的流映射为从每行中拆分出来的单词流的示例。流中单词的顺序将与行中单词的顺序匹配。

TStream<String> lines = ... 
TStream<String> words = lines.flatMap( 
line -> Arrays.asList(line.split(" ")));

代码示例

代码示例来源:origin: apache/incubator-edgent

ResultsHandler<T,R> resultsHandler
  ) {
return stream.flatMap(new JdbcStatement<T,R>(connector,
    stmtSupplier, paramSetter, resultsHandler));

代码示例来源:origin: apache/incubator-edgent

/**
 * For each tuple on {@code stream} execute an SQL statement and
 * add 0 or more resulting tuples to a result stream.
 * <p>
 * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter, ResultsHandler)}
 * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
 * for the {@code StatementSupplier}.
 * 
 * @param <T> Tuple type for input stream
 * @param <R> Tuple type of result stream
 * @param stream tuples to execute a SQL statement on behalf of
 * @param stmtSupplier an SQL statement
 * @param paramSetter function to set SQL statement parameters
 * @param resultsHandler SQL ResultSet handler
 * @return result Stream
 */
public <T,R> TStream<R> executeStatement(TStream<T> stream,
    Supplier<String> stmtSupplier,
    ParameterSetter<T> paramSetter,
    ResultsHandler<T,R> resultsHandler
    ) {
  return stream.flatMap(new JdbcStatement<T,R>(connector,
      cn -> cn.prepareStatement(stmtSupplier.get()),
      paramSetter, resultsHandler));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testPeriodicSource() throws Exception {
 Topology t = newTopology("testPeriodicSource");
 
 Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
 System.out.println("Test: "+t.getName()+" "+tempFile1);
 
 ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
 
 int NUM_POLLS = 3;
 List<String> expLines = new ArrayList<>();
 for (int i = 0; i < NUM_POLLS; i++) {
  expLines.addAll(Arrays.asList(getLines()));
 }
 
 TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
 TStream<String> s = ls.flatMap(list -> list); 
 
 try {
  completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
 }
 finally {
   tempFile1.toFile().delete();
 }
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void tesFlattMapWithNullValues() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
   values.set(2, null); values.set(4, null); return values;});
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "little", "its", "fleece",  "white",
      "snow");
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void tesFlattMapWithNullIterator() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "a", "little", "lamb", "its", "fleece", "was", "white", "as",
      "snow");
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void tesFlattMap() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "a", "little", "lamb", "its", "fleece", "was", "white", "as",
      "snow");
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mc1.flatMap(tuple -> Arrays.asList(tuple.toString().split(" ")));

相关文章