org.apache.edgent.topology.TStream.map()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(64)

本文整理了Java中org.apache.edgent.topology.TStream.map()方法的一些代码示例,展示了TStream.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.map()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:map

TStream.map介绍

[英]Declare a new stream that maps (or transforms) each tuple from this stream into one (or zero) tuple of a different type U. For each tuple ton this stream, the returned stream will contain a tuple that is the result of mapper.apply(t) when the return is not null. If mapper.apply(t) returns null then no tuple is submitted to the returned stream for t.

Examples of transforming a stream containing numeric values as String objects into a stream of Double values.

// Using lambda expression 
TStream<String> strings = ... 
TStream<Double> doubles = strings.map(v -> Double.valueOf(v)); 
// Using method reference 
TStream<String> strings = ... 
TStream<Double> doubles = strings.map(Double::valueOf);

[中]声明一个新流,将该流中的每个元组映射(或转换)为一个(或零个)不同类型U的元组。对于该流中的每个元组,返回的流将包含一个元组,该元组是mapper的结果。当返回值不为空时应用(t)。如果地图绘制者。apply(t)返回null,则不会向t的返回流提交任何元组。
将包含数值作为字符串对象的流转换为双值流的示例。

// Using lambda expression 
TStream<String> strings = ... 
TStream<Double> doubles = strings.map(v -> Double.valueOf(v)); 
// Using method reference 
TStream<String> strings = ... 
TStream<Double> doubles = strings.map(Double::valueOf);

代码示例

代码示例来源:origin: org.apache.edgent/edgent-apps-iot

/**
 * Publishes events derived from {@code stream} using the topic
 * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
 */
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  stream = stream.map(event -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId);
    publishedEvent.add("event", event);
    publishedEvent.addProperty("qos", qos);
    return publishedEvent;
  });
  return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Publishes events derived from {@code stream} using the topic
 * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
 */
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  stream = stream.map(event -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId);
    publishedEvent.add("event", event);
    publishedEvent.addProperty("qos", qos);
    return publishedEvent;
  });
  return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Insert a blocking delay before forwarding the first tuple and
 * no delay for subsequent tuples.
 * <p>
 * Delays less than 1msec are translated to a 0 delay.
 * <p>
 * Sample use:
 * <pre>{@code
 * TStream<String> stream = topology.strings("a", "b, "c");
 * // create a stream where the first tuple is delayed by 5 seconds. 
 * TStream<String> oneShotDelayedStream =
 *      stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
 * }</pre>
 * 
 * @param <T> tuple type
 * @param stream input stream
 * @param delay Amount of time to delay a tuple.
 * @param unit Time unit for {@code delay}.
 * @return the delayed stream
 */
public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
  return stream.map( blockingOneShotDelay(delay, unit) );
}

代码示例来源:origin: org.apache.edgent/edgent-apps-iot

/**
 * Publishes events derived from {@code stream} using the topic
 * {@link EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
 */
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  stream = stream.map(event -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId.apply(event));
    publishedEvent.add("event", payload.apply(event));
    publishedEvent.addProperty("qos", qos.apply(event));
    return publishedEvent;
  });
  return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Publishes events derived from {@code stream} using the topic
 * {@link EVENTS} as a JsonObject containing eventId, event,
 * and qos keys.
 */
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  stream = stream.map(event -> {
    JsonObject publishedEvent = new JsonObject();
    publishedEvent.addProperty("eventId", eventId.apply(event));
    publishedEvent.add("event", payload.apply(event));
    publishedEvent.addProperty("qos", qos.apply(event));
    return publishedEvent;
  });
  return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
}

代码示例来源:origin: org.apache.edgent/edgent-api-topology

/**
 * Perform an analytic function on tuples in parallel.
 * <P>
 * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
 * </P>
 * @param <T> Input stream tuple type
 * @param <U> Result stream tuple type
 * @param stream input stream
 * @param splitter the tuple channel allocation function
 * @param mapper analytic function
 * @param width number of channels
 * @return the unordered result stream
 * @see #roundRobinSplitter(int) roundRobinSplitter
 * @see #concurrentMap(TStream, List, Function) concurrentMap
 */
public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
 BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
 return parallel(stream, width, splitter, pipeline);
}

代码示例来源:origin: apache/incubator-edgent

public static TStream<JsonObject> sourceData(Topology topology)
{
    TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
    
    return seed.map(s -> {
      JsonObject j = new JsonObject();
      j.addProperty("id", s.substring(0, 1));
      j.addProperty("value", Integer.valueOf(s.substring(1)));
      return j;
    });
}

代码示例来源:origin: apache/incubator-edgent

public static TStream<JsonObject> sourceMvData(Topology topology)
{
 return sourceData(topology)
   .map(jo -> {
    jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
    return jo;
   });
}

代码示例来源:origin: apache/incubator-edgent

@SuppressWarnings("unused")
private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
          return jo; })
   .map(fakeJsonAnalytic(channel, period, unit))
   .filter(t->true)
   .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
         return jo; })
   .tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
  
  stream = stream.map(e -> {
    JsonObject c = new JsonObject();
    c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
    c.add(CMD_PAYLOAD, e);
    c.addProperty(CMD_FORMAT, "json");
    c.addProperty(CMD_TS, System.currentTimeMillis());
    return c;
  });
  
  return handleEvents(stream);
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
    UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
  
  stream = stream.map(e -> {
    JsonObject c = new JsonObject();
    JsonObject evPayload = payload.apply(e);
    c.addProperty(CMD_DEVICE, EVENT_CMD_DEVICE);
    c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
    c.add(CMD_PAYLOAD, evPayload);
    c.addProperty(CMD_FORMAT, "json");
    c.addProperty(CMD_TS, System.currentTimeMillis());
    return c;
  });
  
  return handleEvents(stream);
}

代码示例来源:origin: apache/incubator-edgent

private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
 return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
 return (stream,channel) -> stream
   .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
   .filter(t->true)
   .tag("pipeline-ch"+channel);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBytes() throws Exception {
  Topology t = newTopology("testBytes");
  System.out.println("===== "+t.getName());
  startEchoer();  // before getConfig() so it gets the port
  
  Properties config = getConfig();
  WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
  
  String[] expected = new String[] { getStr1(), getStr2() };
  
  TStream<byte[]> s = t.strings(expected)
              .map(tup -> tup.getBytes(StandardCharsets.UTF_8));
  s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
  wsClient.sendBytes(s);
  
  TStream<String> rcvd = wsClient.receiveBytes()
              .map(tup -> new String(tup, StandardCharsets.UTF_8));
  
  completeAndValidate("", t, rcvd, SEC_TMO, expected);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBlockingDelay() throws Exception {
  // Timing variances on shared machines can cause this test to fail
  assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
  Topology topology = newTopology();
  
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  
  TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
  
  // delay stream
  starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
  
  // calculate delay
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  
  starts = starts.filter(v -> v >= 300);
  
  Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testOneShotDelay() throws Exception {
  Topology topology = newTopology();
  
  TStream<String> strings = topology.strings("a", "b", "c", "d");
  
  TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
  
  // delay stream
  starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
  
  // calculate display
  starts = starts.modify(v -> System.currentTimeMillis() - v);
  
  // the first tuple shouldn't satisfy the predicate
  starts = starts.filter(v -> v < 300);
  
  Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
  complete(topology, tc);
  assertTrue("valid:" + tc.getResult(), tc.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBasicRead() throws Exception {
  Topology t = this.newTopology("testBasicRead");
  
  populatePersonsTable(getPersonList());
  List<String> expected = expectedPersons(person->true, getPersonList());
  JdbcStreams db = new JdbcStreams(t,
      () -> getDataSource(DB_NAME),
      dataSource -> connect(dataSource));
  // Create a stream of Person from a stream of ids
  TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
  TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
  
  rcvd.sink(tuple -> System.out.println(
      String.format("%s rcvd: %s", t.getName(), tuple)));
  completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testMap() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("32", "423", "-746");
  TStream<Integer> i = s.map(Integer::valueOf);
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 3);
  Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
  complete(t, tc);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testModifyWithDrops() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("32", "423", "-746");
  TStream<Integer> i = s.map(Integer::valueOf);
  i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
  assertStream(t, i);
  Condition<Long> tc = t.getTester().tupleCount(i, 2);
  Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
  complete(t, tc);
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testRuntimeServices() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("A");
  
  Supplier<RuntimeServices> serviceGetter =
      t.getRuntimeServiceSupplier();
  
  TStream<Boolean> b = s.map(tuple -> 
    serviceGetter.get().getService(ThreadFactory.class) != null
    && serviceGetter.get().getService(ScheduledExecutorService.class) != null
  );
  
  Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
  complete(t, tc);
  
  assertTrue(tc.valid());
}

相关文章