org.apache.edgent.topology.TStream.split()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(118)

本文整理了Java中org.apache.edgent.topology.TStream.split()方法的一些代码示例,展示了TStream.split()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.split()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:split

TStream.split介绍

[英]Split a stream's tuples among n streams as specified by splitter.

For each tuple on the stream, splitter.applyAsInt(tuple) is called. The return value r determines the destination stream:

if r < 0 the tuple is discarded 
else it is sent to the stream at position (r % n) in the returned array.

Each split TStream is exposed by the API. The user has full control over the each stream's processing pipeline. Each stream's pipeline must be declared explicitly. Each stream can have different processing pipelines.

An N-way split() is logically equivalent to a collection of N filter() invocations, each with a predicate to select the tuples for its stream. split() is more efficient. Each tuple is analyzed only once by a single splitter instance to identify the destination stream. For example, these are logically equivalent:

List<TStream<String>> streams = stream.split(2, tuple -> tuple.length()); 
TStream<String> stream0 = stream.filter(tuple -> (tuple.length() % 2) == 0); 
TStream<String> stream1 = stream.filter(tuple -> (tuple.length() % 2) == 1);

Example of splitting a stream of log records by their level attribute:

TStream<LogRecord> lrs = ... 
List<<TStream<LogRecord>> splits = lrr.split(3, lr -> { 
if (SEVERE.equals(lr.getLevel())) 
return 0; 
else if (WARNING.equals(lr.getLevel())) 
return 1; 
else 
return 2; 
}); 
splits.get(0). ... // SEVERE log record processing pipeline 
splits.get(1). ... // WARNING log record  processing pipeline 
splits.get(2). ... // "other" log record processing pipeline

[中]按照splitter的指定,在n个流中拆分流的元组。
对于流中的每个元组,使用splitter。调用applyAsInt(tuple)。返回值r确定目标流:

if r < 0 the tuple is discarded 
else it is sent to the stream at position (r % n) in the returned array.

每个分割的TStream都由API公开。用户可以完全控制每个流的处理管道。每个流的管道都必须明确声明。每个流可以有不同的处理管道。
N-way split()在逻辑上相当于N个filter()调用的集合,每个调用都有一个谓词来为其流选择元组。split()更有效。每个元组仅由单个拆分器实例分析一次,以标识目标流。例如,它们在逻辑上是等价的:

List<TStream<String>> streams = stream.split(2, tuple -> tuple.length()); 
TStream<String> stream0 = stream.filter(tuple -> (tuple.length() % 2) == 0); 
TStream<String> stream1 = stream.filter(tuple -> (tuple.length() % 2) == 1);

按级别属性拆分日志记录流的示例:

TStream<LogRecord> lrs = ... 
List<<TStream<LogRecord>> splits = lrr.split(3, lr -> { 
if (SEVERE.equals(lr.getLevel())) 
return 0; 
else if (WARNING.equals(lr.getLevel())) 
return 1; 
else 
return 2; 
}); 
splits.get(0). ... // SEVERE log record processing pipeline 
splits.get(1). ... // WARNING log record  processing pipeline 
splits.get(2). ... // "other" log record processing pipeline

代码示例

代码示例来源:origin: apache/incubator-edgent

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: org.apache.edgent/edgent-api-topology

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: org.apache.edgent/edgent-api-topology

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: apache/incubator-edgent

List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
 channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));

代码示例来源:origin: apache/incubator-edgent

/**
 * Test split() zero outputs
 * @throws Exception on failure
 */
@Test(expected = IllegalArgumentException.class)
public void testSplitWithZeroOutputs() throws Exception {
  newTopology().strings("a1").split(0, tuple -> 0);
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Test split() negative outputs
 * @throws Exception on failure
 */
@Test(expected = IllegalArgumentException.class)
public void testSplitWithNegativeOutputs() throws Exception {
  newTopology().strings("a1").split(-28, tuple -> 0);
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Test split(enum) with integer type enum.
 * @throws Exception on failure
 */
@Test(expected = IllegalArgumentException.class)
public void testSplitWithEnumForZeroSizeClass() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("Test");
  s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));
}

代码示例来源:origin: apache/incubator-edgent

List<TStream<Integer>> splits = d.split(3, tuple -> {
  switch (tuple.intValue()) {
  case 0:

代码示例来源:origin: apache/incubator-edgent

List<TStream<Double>> splits1 = gaussian.split(11, tuple -> {
  switch (tuple.toString().charAt(0)) {
  case '-':              //negative numbers
mcs1.peek(tuple -> System.out.println(" mcs1_source2: " + tuple.toString()));
List<TStream<String>> splits2 = mcs1.split(2, tuple -> {
  switch (tuple.toString().charAt(0)) {
  case '-':              //negative numbers

代码示例来源:origin: apache/incubator-edgent

/**
 * Test split() with no drops.
 * @throws Exception on failure
 */
@Test
public void testSplit() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
  List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
  Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
  Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
  Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
  Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
  Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
  Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
  complete(t, t.getTester().and(tc0, tc1, tc2));
  assertTrue(contents0.toString(), contents0.valid());
  assertTrue(contents1.toString(), contents1.valid());
  assertTrue(contents2.toString(), contents2.valid());
}

代码示例来源:origin: apache/incubator-edgent

List<TStream<String>> splits = s.split(3, tuple -> {
  switch (tuple.charAt(0)) {
  case 'a':

代码示例来源:origin: apache/incubator-edgent

/**
 * Test split(enum) with integer type enum.
 * @throws Exception on failure
 */
@Test
public void testSplitWithEnum() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
  TStream<String> i = s.map(String::toString);
  EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
  assertStream(t, i);
  Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
  Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
  Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
  Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
  Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
  Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
  Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
    "Log3_INFO", "Log4_INFO");
  Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
    "Log6_ERROR");
  Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
  Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
  complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
  assertTrue(contents0.toString(), contents0.valid());
  assertTrue(contents1.toString(), contents1.valid());
  assertTrue(contents2.toString(), contents2.valid());
  assertTrue(contents3.toString(), contents3.valid());
  assertTrue(contents4.toString(), contents4.valid());
}

相关文章