
x33g5p2x  于2022-01-30 转载在 其他  



[英]Declare a new stream that maps tuples from this stream into one or more (or zero) tuples of a different type U. For each tuple t on this stream, the returned stream will contain all non-null tuples in the Iterator that is the result of mapper.apply(t). Tuples will be added to the returned stream in the order the iterator returns them.
If the return is null or an empty iterator then no tuples are added to the returned stream for input tuple t.

Examples of mapping a stream containing lines of text into a stream of words split out from each line. The order of the words in the stream will match the order of the words in the lines.

TStream<String> lines = ... 
TStream<String> words = lines.flatMap( 
line -> Arrays.asList(line.split(" ")));


TStream<String> lines = ... 
TStream<String> words = lines.flatMap( 
line -> Arrays.asList(line.split(" ")));


代码示例来源:origin: apache/incubator-edgent

ResultsHandler<T,R> resultsHandler
  ) {
return stream.flatMap(new JdbcStatement<T,R>(connector,
    stmtSupplier, paramSetter, resultsHandler));

代码示例来源:origin: apache/incubator-edgent

 * For each tuple on {@code stream} execute an SQL statement and
 * add 0 or more resulting tuples to a result stream.
 * <p>
 * Same as using {@link #executeStatement(TStream, StatementSupplier, ParameterSetter, ResultsHandler)}
 * specifying {@code dataSource -> dataSource.prepareStatement(stmtSupplier.get()}}
 * for the {@code StatementSupplier}.
 * @param <T> Tuple type for input stream
 * @param <R> Tuple type of result stream
 * @param stream tuples to execute a SQL statement on behalf of
 * @param stmtSupplier an SQL statement
 * @param paramSetter function to set SQL statement parameters
 * @param resultsHandler SQL ResultSet handler
 * @return result Stream
public <T,R> TStream<R> executeStatement(TStream<T> stream,
    Supplier<String> stmtSupplier,
    ParameterSetter<T> paramSetter,
    ResultsHandler<T,R> resultsHandler
    ) {
  return stream.flatMap(new JdbcStatement<T,R>(connector,
      cn -> cn.prepareStatement(stmtSupplier.get()),
      paramSetter, resultsHandler));

代码示例来源:origin: apache/incubator-edgent

public void testPeriodicSource() throws Exception {
 Topology t = newTopology("testPeriodicSource");
 Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
 System.out.println("Test: "+t.getName()+" "+tempFile1);
 ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
 int NUM_POLLS = 3;
 List<String> expLines = new ArrayList<>();
 for (int i = 0; i < NUM_POLLS; i++) {
 TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
 TStream<String> s = ls.flatMap(list -> list); 
 try {
  completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
 finally {

代码示例来源:origin: apache/incubator-edgent

public void tesFlattMapWithNullValues() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
   values.set(2, null); values.set(4, null); return values;});
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "little", "its", "fleece",  "white",
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

public void tesFlattMapWithNullIterator() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "a", "little", "lamb", "its", "fleece", "was", "white", "as",
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

public void tesFlattMap() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("mary had a little lamb",
      "its fleece was white as snow");
  TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
  assertStream(t, w);
  Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
      "a", "little", "lamb", "its", "fleece", "was", "white", "as",
  complete(t, contents);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

mc1.peek(tuple -> System.out.println("MyClass1: " + tuple.toString()));
mc1.flatMap(tuple -> Arrays.asList(tuple.toString().split(" ")));
