
x33g5p2x  于2022-01-30 转载在 其他  



[英]Split a stream's tuples among n streams as specified by splitter.

For each tuple on the stream, splitter.applyAsInt(tuple) is called. The return value r determines the destination stream:

if r < 0 the tuple is discarded 
else it is sent to the stream at position (r % n) in the returned array.

Each split TStream is exposed by the API. The user has full control over the each stream's processing pipeline. Each stream's pipeline must be declared explicitly. Each stream can have different processing pipelines.

An N-way split() is logically equivalent to a collection of N filter() invocations, each with a predicate to select the tuples for its stream. split() is more efficient. Each tuple is analyzed only once by a single splitter instance to identify the destination stream. For example, these are logically equivalent:

List<TStream<String>> streams = stream.split(2, tuple -> tuple.length()); 
TStream<String> stream0 = stream.filter(tuple -> (tuple.length() % 2) == 0); 
TStream<String> stream1 = stream.filter(tuple -> (tuple.length() % 2) == 1);

Example of splitting a stream of log records by their level attribute:

TStream<LogRecord> lrs = ... 
List<<TStream<LogRecord>> splits = lrr.split(3, lr -> { 
if (SEVERE.equals(lr.getLevel())) 
return 0; 
else if (WARNING.equals(lr.getLevel())) 
return 1; 
return 2; 
splits.get(0). ... // SEVERE log record processing pipeline 
splits.get(1). ... // WARNING log record  processing pipeline 
splits.get(2). ... // "other" log record processing pipeline


if r < 0 the tuple is discarded 
else it is sent to the stream at position (r % n) in the returned array.

N-way split()在逻辑上相当于N个filter()调用的集合,每个调用都有一个谓词来为其流选择元组。split()更有效。每个元组仅由单个拆分器实例分析一次,以标识目标流。例如,它们在逻辑上是等价的:

List<TStream<String>> streams = stream.split(2, tuple -> tuple.length()); 
TStream<String> stream0 = stream.filter(tuple -> (tuple.length() % 2) == 0); 
TStream<String> stream1 = stream.filter(tuple -> (tuple.length() % 2) == 1);


TStream<LogRecord> lrs = ... 
List<<TStream<LogRecord>> splits = lrr.split(3, lr -> { 
if (SEVERE.equals(lr.getLevel())) 
return 0; 
else if (WARNING.equals(lr.getLevel())) 
return 1; 
return 2; 
splits.get(0). ... // SEVERE log record processing pipeline 
splits.get(1). ... // WARNING log record  processing pipeline 
splits.get(2). ... // "other" log record processing pipeline


代码示例来源:origin: apache/incubator-edgent

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: org.apache.edgent/edgent-api-topology

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: org.apache.edgent/edgent-api-topology

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: apache/incubator-edgent

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: apache/incubator-edgent

 * Test split() zero outputs
 * @throws Exception on failure
@Test(expected = IllegalArgumentException.class)
public void testSplitWithZeroOutputs() throws Exception {
  newTopology().strings("a1").split(0, tuple -> 0);

代码示例来源:origin: apache/incubator-edgent

 * Test split() negative outputs
 * @throws Exception on failure
@Test(expected = IllegalArgumentException.class)
public void testSplitWithNegativeOutputs() throws Exception {
  newTopology().strings("a1").split(-28, tuple -> 0);

代码示例来源:origin: apache/incubator-edgent

 * Test split(enum) with integer type enum.
 * @throws Exception on failure
@Test(expected = IllegalArgumentException.class)
public void testSplitWithEnumForZeroSizeClass() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("Test");
  s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));

代码示例来源:origin: apache/incubator-edgent

List<TStream<Integer>> splits = d.split(3, tuple -> {
  switch (tuple.intValue()) {
  case 0:

代码示例来源:origin: apache/incubator-edgent

List<TStream<Double>> splits1 = gaussian.split(11, tuple -> {
  switch (tuple.toString().charAt(0)) {
  case '-':              //negative numbers
mcs1.peek(tuple -> System.out.println(" mcs1_source2: " + tuple.toString()));
List<TStream<String>> splits2 = mcs1.split(2, tuple -> {
  switch (tuple.toString().charAt(0)) {
  case '-':              //negative numbers

代码示例来源:origin: apache/incubator-edgent

 * Test split() with no drops.
 * @throws Exception on failure
public void testSplit() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
  List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
  Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
  Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
  Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
  Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
  Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
  Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
  complete(t, t.getTester().and(tc0, tc1, tc2));
  assertTrue(contents0.toString(), contents0.valid());
  assertTrue(contents1.toString(), contents1.valid());
  assertTrue(contents2.toString(), contents2.valid());

代码示例来源:origin: apache/incubator-edgent

List<TStream<String>> splits = s.split(3, tuple -> {
  switch (tuple.charAt(0)) {
  case 'a':

代码示例来源:origin: apache/incubator-edgent

 * Test split(enum) with integer type enum.
 * @throws Exception on failure
public void testSplitWithEnum() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
  TStream<String> i =;
  EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
  assertStream(t, i);
  Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
  Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
  Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
  Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
  Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
  Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
  Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
    "Log3_INFO", "Log4_INFO");
  Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
  Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
  Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
  complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
  assertTrue(contents0.toString(), contents0.valid());
  assertTrue(contents1.toString(), contents1.valid());
  assertTrue(contents2.toString(), contents2.valid());
  assertTrue(contents3.toString(), contents3.valid());
  assertTrue(contents4.toString(), contents4.valid());
