org.apache.edgent.topology.TStream.pipe()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(81)

本文整理了Java中org.apache.edgent.topology.TStream.pipe()方法的一些代码示例,展示了TStream.pipe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.pipe()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:pipe

TStream.pipe介绍

[英]Declare a stream that contains the output of the specified Pipeoplet applied to this stream.
[中]声明一个流,其中包含应用于该流的指定Pipeoplet的输出。

代码示例

代码示例来源:origin: apache/incubator-edgent

/**
 * Increment a counter metric when peeking at each tuple.
 * 
 * @param <T>
 *            TStream tuple type
 * @param stream to stream to instrument
 * @return a {@link TStream} containing the input tuples
 */
public static <T> TStream<T> counter(TStream<T> stream) {
  return stream.pipe(new CounterOp<T>());
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Measure current tuple throughput and calculate one-, five-, and
 * fifteen-minute exponentially-weighted moving averages.
 * 
 * @param <T>
 *            TStream tuple type
 * @param stream to stream to instrument
 * @return a {@link TStream} containing the input tuples
 */
public static <T> TStream<T> rateMeter(TStream<T> stream) {
  return stream.pipe(new RateMeter<T>());
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Isolate upstream processing from downstream processing.
 * <P>
 * If the processing against the returned stream cannot keep up
 * with the arrival rate of tuples on {@code stream}, upstream
 * processing will block until there is space in the queue between
 * the streams.
 * </P><P>
 * Processing of tuples occurs in the order they were received.
 * </P>
 * 
 * @param <T> Tuple type
 * @param stream Stream to be isolated from downstream processing.
 * @param queueCapacity size of the queue between {@code stream} and
 *        the returned stream.
 * @return Stream that is isolated from {@code stream}.
 * @see #pressureReliever(TStream, Function, int) pressureReliever
 */
public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
 return stream.pipe(new Isolate<T>(queueCapacity));
}

代码示例来源:origin: org.apache.edgent/edgent-api-topology

/**
 * Isolate upstream processing from downstream processing.
 * <P>
 * If the processing against the returned stream cannot keep up
 * with the arrival rate of tuples on {@code stream}, upstream
 * processing will block until there is space in the queue between
 * the streams.
 * </P><P>
 * Processing of tuples occurs in the order they were received.
 * </P>
 * 
 * @param <T> Tuple type
 * @param stream Stream to be isolated from downstream processing.
 * @param queueCapacity size of the queue between {@code stream} and
 *        the returned stream.
 * @return Stream that is isolated from {@code stream}.
 * @see #pressureReliever(TStream, Function, int) pressureReliever
 */
public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
 return stream.pipe(new Isolate<T>(queueCapacity));
}

代码示例来源:origin: org.apache.edgent/edgent-api-topology

return stream.pipe(new PressureReliever<>(count, keyFunction));

代码示例来源:origin: org.apache.edgent/edgent-api-topology

/**
 * Isolate upstream processing from downstream processing.
 * <BR>
 * Implementations may throw {@code OutOfMemoryExceptions} 
 * if the processing against returned stream cannot keep up
 * with the arrival rate of tuples on {@code stream}.
 *
 * @param <T> Tuple type
 * @param stream Stream to be isolated from downstream processing.
 * @param ordered {@code true} to maintain arrival order on the returned stream,
 * {@code false} to not guaranteed arrival order.
 * @return Stream that is isolated from {@code stream}.
 */
public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
  return stream.pipe(
      ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Isolate upstream processing from downstream processing.
 * <BR>
 * Implementations may throw {@code OutOfMemoryExceptions} 
 * if the processing against returned stream cannot keep up
 * with the arrival rate of tuples on {@code stream}.
 *
 * @param <T> Tuple type
 * @param stream Stream to be isolated from downstream processing.
 * @param ordered {@code true} to maintain arrival order on the returned stream,
 * {@code false} to not guaranteed arrival order.
 * @return Stream that is isolated from {@code stream}.
 */
public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
  return stream.pipe(
      ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}

代码示例来源:origin: apache/incubator-edgent

@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) { 
  processor = Functions.synchronizedBiFunction(processor);
  Window<T, K, LinkedList<T>> window = Windows.lastNProcessOnInsert(size, getKeyFunction());
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
  return feeder().pipe(op); 
}

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) { 
  processor = Functions.synchronizedBiFunction(processor);
  Window<T, K, LinkedList<T>> window = Windows.lastNProcessOnInsert(size, getKeyFunction());
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
  return feeder().pipe(op); 
}

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

@Override
public <J, U, K> TStream<J> joinLast(Function<T, K> keyer,
    TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner) {
  BiFunction<List<U>,K, Object> processor = Functions.synchronizedBiFunction((list, key) -> null);
  Window<U, K, LinkedList<U>> window = Windows.lastNProcessOnInsert(1, lastStreamKeyer);
  Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window, processor);
  lastStream.pipe(op);
  return this.map((tuple) -> {
    Partition<U, K, ? extends List<U>> part = window.getPartitions().get(keyer.apply(tuple));
    if(part == null)
      return null;
    J ret;
    synchronized (part) {
      U last = part.getContents().get(0);
      ret = joiner.apply(tuple, last);
    }
    return ret;
  });
}

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
  batcher = Functions.synchronizedBiFunction(batcher);
  Window<T, K, List<T>> window =
      Windows.window(
          alwaysInsert(),
          Policies.scheduleEvictOnFirstInsert(time, unit),
          Policies.evictAllAndScheduleEvictWithProcess(time, unit),
          (partition, tuple) -> {},
          getKeyFunction(),
          () -> new ArrayList<T>());
  
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
  return feeder().pipe(op); 
}

代码示例来源:origin: apache/incubator-edgent

@Override
public <J, U, K> TStream<J> joinLast(Function<T, K> keyer,
    TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner) {
  BiFunction<List<U>,K, Object> processor = Functions.synchronizedBiFunction((list, key) -> null);
  Window<U, K, LinkedList<U>> window = Windows.lastNProcessOnInsert(1, lastStreamKeyer);
  Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window, processor);
  lastStream.pipe(op);
  return this.map((tuple) -> {
    Partition<U, K, ? extends List<U>> part = window.getPartitions().get(keyer.apply(tuple));
    if(part == null)
      return null;
    J ret;
    synchronized (part) {
      U last = part.getContents().get(0);
      ret = joiner.apply(tuple, last);
    }
    return ret;
  });
}

代码示例来源:origin: apache/incubator-edgent

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
  batcher = Functions.synchronizedBiFunction(batcher);
  Window<T, K, List<T>> window =
      Windows.window(
          alwaysInsert(),
          Policies.scheduleEvictOnFirstInsert(time, unit),
          Policies.evictAllAndScheduleEvictWithProcess(time, unit),
          (partition, tuple) -> {},
          getKeyFunction(),
          () -> new ArrayList<T>());
  
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
  return feeder().pipe(op); 
}

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
  batcher = Functions.synchronizedBiFunction(batcher);
  Window<T, K, List<T>> window =
      Windows.window(
          alwaysInsert(),
          Policies.doNothing(),
          Policies.evictAll(),
          Policies.processWhenFullAndEvict(size),
          getKeyFunction(),
          () -> new ArrayList<T>(size));
  
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
  return feeder().pipe(op); 
}

代码示例来源:origin: apache/incubator-edgent

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
  batcher = Functions.synchronizedBiFunction(batcher);
  Window<T, K, List<T>> window =
      Windows.window(
          alwaysInsert(),
          Policies.doNothing(),
          Policies.evictAll(),
          Policies.processWhenFullAndEvict(size),
          getKeyFunction(),
          () -> new ArrayList<T>(size));
  
  Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
  return feeder().pipe(op); 
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void automaticMetricCleanup2() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  TStream<Integer> ints2 = ints.pipe(new TestOplet<Integer>());
  ints2.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(4, all.size());
  
  // After close all metrics have been unregistered 
  job.stateChange(Job.Action.CLOSE);
  assertEquals(0, all.size());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void automaticMetricCleanup1() throws Exception {
  // Declare topology with custom metric oplet
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
  ints.pipe(new TestOplet<Integer>());
  // Submit job
  Future<? extends Job> fj = getSubmitter().submit(t);
  Job job = fj.get();
  Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50));
  // At least one tuple was processed
  int tupleCount = n.get(); 
  assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0);
  // Each test oplet registers two metrics 
  Map<String, Metric> all = metricRegistry.getMetrics();
  assertEquals(2, all.size());
  
  // After close all metrics have been unregistered 
  job.stateChange(Job.Action.CLOSE);
  assertEquals(0, all.size());
}

代码示例来源:origin: apache/incubator-edgent

return feeder().pipe(op);

代码示例来源:origin: apache/incubator-edgent

@Test(expected = ExecutionException.class)
public void jobProcessSourceError() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.generate(() -> n.incrementAndGet());
  ints.pipe(new FailedOplet<Integer>(12, 100));
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(10, TimeUnit.SECONDS); 
  } finally {
    // RUNNING even though execution error 
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.UNHEALTHY, job.getHealth());
    assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
  }
}

代码示例来源:origin: apache/incubator-edgent

@Test(expected = ExecutionException.class)
public void jobPeriodicSourceError() throws Exception {
  Topology t = newTopology();
  AtomicInteger n = new AtomicInteger(0);
  TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
  ints.pipe(new FailedOplet<Integer>(5, 0));
  
  Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
  Job job = fj.get();
  assertEquals(Job.State.RUNNING, job.getCurrentState());
  try {
    job.complete(10, TimeUnit.SECONDS); 
  } finally {
    // RUNNING even though execution error 
    assertEquals(Job.State.RUNNING, job.getCurrentState());
    assertEquals(Job.Health.UNHEALTHY, job.getHealth());
    assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
  }
}

相关文章