
x33g5p2x  于2022-01-30 转载在 其他  



[英]Declare a new stream that maps (or transforms) each tuple from this stream into one (or zero) tuple of a different type U. For each tuple ton this stream, the returned stream will contain a tuple that is the result of mapper.apply(t) when the return is not null. If mapper.apply(t) returns null then no tuple is submitted to the returned stream for t.

Examples of transforming a stream containing numeric values as String objects into a stream of Double values.

// Using lambda expression 
TStream<String> strings = ... 
TStream<Double> doubles = -> Double.valueOf(v)); 
// Using method reference 
TStream<String> strings = ... 
TStream<Double> doubles =;


// Using lambda expression 
TStream<String> strings = ... 
TStream<Double> doubles = -> Double.valueOf(v)); 
// Using method reference 
TStream<String> strings = ... 
TStream<Double> doubles =;


代码示例来源:origin: org.apache.edgent/edgent-apps-iot

 * Publishes events derived from {@code stream} using the topic
 * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  stream = -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId);
    publishedEvent.add("event", event);
    publishedEvent.addProperty("qos", qos);
    return publishedEvent;
  return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);

代码示例来源:origin: apache/incubator-edgent

 * Publishes events derived from {@code stream} using the topic
 * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  stream = -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId);
    publishedEvent.add("event", event);
    publishedEvent.addProperty("qos", qos);
    return publishedEvent;
  return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);

代码示例来源:origin: apache/incubator-edgent

 * Insert a blocking delay before forwarding the first tuple and
 * no delay for subsequent tuples.
 * <p>
 * Delays less than 1msec are translated to a 0 delay.
 * <p>
 * Sample use:
 * <pre>{@code
 * TStream<String> stream = topology.strings("a", "b, "c");
 * // create a stream where the first tuple is delayed by 5 seconds. 
 * TStream<String> oneShotDelayedStream =
 * blockingOneShotDelay(5, TimeUnit.SECONDS) );
 * }</pre>
 * @param <T> tuple type
 * @param stream input stream
 * @param delay Amount of time to delay a tuple.
 * @param unit Time unit for {@code delay}.
 * @return the delayed stream
public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
  return blockingOneShotDelay(delay, unit) );

代码示例来源:origin: org.apache.edgent/edgent-apps-iot

 * Publishes events derived from {@code stream} using the topic
 * {@link EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  stream = -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId.apply(event));
    publishedEvent.add("event", payload.apply(event));
    publishedEvent.addProperty("qos", qos.apply(event));
    return publishedEvent;
  return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);

代码示例来源:origin: apache/incubator-edgent

 * Publishes events derived from {@code stream} using the topic
 * {@link EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  stream = -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId.apply(event));
    publishedEvent.add("event", payload.apply(event));
    publishedEvent.addProperty("qos", qos.apply(event));
    return publishedEvent;
  return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);

代码示例来源:origin: org.apache.edgent/edgent-api-topology

 * Perform an analytic function on tuples in parallel.
 * <P>
 * Same as {@code parallel(stream, width, splitter, (s,ch) -> -> mapper.apply(t, ch))}
 * </P>
 * @param <T> Input stream tuple type
 * @param <U> Result stream tuple type
 * @param stream input stream
 * @param splitter the tuple channel allocation function
 * @param mapper analytic function
 * @param width number of channels
 * @return the unordered result stream
 * @see #roundRobinSplitter(int) roundRobinSplitter
 * @see #concurrentMap(TStream, List, Function) concurrentMap
public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
 BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> -> mapper.apply(t, ch));
 return parallel(stream, width, splitter, pipeline);

代码示例来源:origin: apache/incubator-edgent

public static TStream<JsonObject> sourceData(Topology topology)
    TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
    return -> {
      JsonObject j = new JsonObject();
      j.addProperty("id", s.substring(0, 1));
      j.addProperty("value", Integer.valueOf(s.substring(1)));
      return j;

代码示例来源:origin: apache/incubator-edgent

public static TStream<JsonObject> sourceMvData(Topology topology)
 return sourceData(topology)
   .map(jo -> {
    jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
    return jo;

代码示例来源:origin: apache/incubator-edgent

private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
          return jo; })
   .map(fakeJsonAnalytic(channel, period, unit))
   .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
         return jo; })

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  stream = -> {
    JsonObject c = new JsonObject();
    c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
    c.add(CMD_PAYLOAD, e);
    c.addProperty(CMD_FORMAT, "json");
    c.addProperty(CMD_TS, System.currentTimeMillis());
    return c;
  return handleEvents(stream);

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  stream = -> {
    JsonObject c = new JsonObject();
    JsonObject evPayload = payload.apply(e);
    c.addProperty(CMD_DEVICE, EVENT_CMD_DEVICE);
    c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
    c.add(CMD_PAYLOAD, evPayload);
    c.addProperty(CMD_FORMAT, "json");
    c.addProperty(CMD_TS, System.currentTimeMillis());
    return c;
  return handleEvents(stream);

代码示例来源:origin: apache/incubator-edgent

private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
 return stream ->, period, unit)).filter(t->true).tag("pipeline-ch"+channel);

代码示例来源:origin: apache/incubator-edgent

private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))

代码示例来源:origin: apache/incubator-edgent

public void testBytes() throws Exception {
  Topology t = newTopology("testBytes");
  System.out.println("===== "+t.getName());
  startEchoer();  // before getConfig() so it gets the port
  Properties config = getConfig();
  WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
  String[] expected = new String[] { getStr1(), getStr2() };
  TStream<byte[]> s = t.strings(expected)
              .map(tup -> tup.getBytes(StandardCharsets.UTF_8));
  s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
  TStream<String> rcvd = wsClient.receiveBytes()
              .map(tup -> new String(tup, StandardCharsets.UTF_8));
  completeAndValidate("", t, rcvd, SEC_TMO, expected);

代码示例来源:origin: apache/incubator-edgent

public void testBlockingDelay() throws Exception {
  // Timing variances on shared machines can cause this test to fail
  Topology topology = newTopology();
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  TStream<Long> starts = -> System.currentTimeMillis());
  // delay stream
  starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
  // calculate delay
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  starts = starts.filter(v -> v >= 300);
  Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());

代码示例来源:origin: apache/incubator-edgent

public void testOneShotDelay() throws Exception {
  Topology topology = newTopology();
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  TStream<Long> starts = -> System.currentTimeMillis());
  // delay stream
  starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
  // calculate display
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  // the first tuple shouldn't satisfy the predicate
  starts = starts.filter(v -> v < 300);
  Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());

代码示例来源:origin: apache/incubator-edgent

public void testBasicRead() throws Exception {
  Topology t = this.newTopology("testBasicRead");
  List<String> expected = expectedPersons(person->true, getPersonList());
  JdbcStreams db = new JdbcStreams(t,
      () -> getDataSource(DB_NAME),
      dataSource -> connect(dataSource));
  // Create a stream of Person from a stream of ids
  TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
  TStream<String> rcvd = -> person.toString());
  rcvd.sink(tuple -> System.out.println(
      String.format("%s rcvd: %s", t.getName(), tuple)));
  completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));

代码示例来源:origin: apache/incubator-edgent

public void testMap() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("32", "423", "-746");
  TStream<Integer> i =;
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 3);
  Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
  complete(t, tc);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

public void testModifyWithDrops() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("32", "423", "-746");
  TStream<Integer> i =;
  i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 2);
  Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
  complete(t, tc);
  assertTrue(contents.getResult().toString(), contents.valid());

代码示例来源:origin: apache/incubator-edgent

public void testRuntimeServices() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("A");
  Supplier<RuntimeServices> serviceGetter =
  TStream<Boolean> b = -> 
    serviceGetter.get().getService(ThreadFactory.class) != null
    && serviceGetter.get().getService(ScheduledExecutorService.class) != null
  Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
  complete(t, tc);
