org.apache.edgent.topology.TStream.sink()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(91)

本文整理了Java中org.apache.edgent.topology.TStream.sink()方法的一些代码示例,展示了TStream.sink()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TStream.sink()方法的具体详情如下:
包路径:org.apache.edgent.topology.TStream
类名称:TStream
方法名:sink

TStream.sink介绍

[英]Sink (terminate) this stream using a function. For each tuple t on this stream Consumer#accept(Object) will be called. This is typically used to send information to external systems, such as databases or dashboards.

If sinker implements AutoCloseable, its close()method will be called when the topology's execution is terminated.

Example of terminating a stream of String tuples by printing them to System.out.

TStream<String> values = ... 
values.sink(t -> System.out.println(tuple));

[中]使用函数接收(终止)此流。对于该流上的每个元组t,将调用#accept(Object)。这通常用于将信息发送到外部系统,如数据库或仪表板。
如果sinker实现了AutoCloseable,则在拓扑执行终止时将调用其close()方法。
通过将字符串元组打印到系统来终止字符串元组流的示例。出来

TStream<String> values = ... 
values.sink(t -> System.out.println(tuple));

代码示例

代码示例来源:origin: apache/incubator-edgent

@Override
public Condition<Long> tupleCount(TStream<?> stream, final long expectedCount) {
  AtomicLong count = new AtomicLong();
  stream.sink(t -> {
    count.incrementAndGet();
  });
  return new Condition<Long>() {
    @Override
    public boolean valid() {
      return count.get() == expectedCount;
    }
    @Override
    public Long getResult() {
      return count.get();
    }
  };
}

代码示例来源:origin: org.apache.edgent/edgent-providers-direct

@Override
public Condition<Long> tupleCount(TStream<?> stream, final long expectedCount) {
  AtomicLong count = new AtomicLong();
  stream.sink(t -> {
    count.incrementAndGet();
  });
  return new Condition<Long>() {
    @Override
    public boolean valid() {
      return count.get() == expectedCount;
    }
    @Override
    public Long getResult() {
      return count.get();
    }
  };
}

代码示例来源:origin: apache/incubator-edgent

@Override
public <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values) {
  List<T> contents = Collections.synchronizedList(new ArrayList<>());
  stream.sink(t -> contents.add(t));
  return new Condition<List<T>>() {
    @Override
    public boolean valid() {
      synchronized (contents) {
        return Arrays.asList(values).equals(contents);
      }
    }
    @Override
    public List<T> getResult() {
      return contents;
    }
  };
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Publish the stream of tuples to the specified queue.
 * @param stream The stream to publish
 * @param queue The specified queue of RabbitMQ
 * @param msgFn A function that yields the byte[] records from the tuple
 * @param <T> Tuple type
 * @return {@link TSink}
 */
public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
  return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(new IotpGWEventsFunction(connector, eventId, payload, qos));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> fqDeviceId,
  TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(new IotpGWDeviceEventsFunction(connector, fqDeviceId, eventId, payload, qos));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
 return stream.sink(new IotpGWEventsFixed(connector, eventId, qos));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> eventsForDevice(String fqDeviceId, TStream<JsonObject> stream,
  String eventId, int qos) {
 return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(
   new IotpGWDeviceEventsFunction(connector, jo -> fqDeviceId, eventId, payload, qos));
}

代码示例来源:origin: apache/incubator-edgent

@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
 return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos));
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Publish a stream's tuples as device events using the WIoTP HTTP protocol.
 * <p>
 * Each tuple is published as a device event with the fixed event identifier.
 * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
 * 
 * @param stream
 *            Stream to be published.
 * @param eventId
 *            Event identifier.
 * @return TSink sink element representing termination of this stream.
 */
public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, String eventId) {
 return stream.sink(new IotpDeviceHttpEventsFixed(connector, eventId));
}

代码示例来源:origin: org.apache.edgent/edgent-connectors-pubsub

/**
 * Publish this stream to a topic.
 * This is a model that allows jobs to subscribe to 
 * streams published by other jobs.
 * 
 * @param <T> Tuple type
 * @param stream stream to publish
 * @param topic Topic to publish to.
 * @param streamType Type of objects on the stream.
 * @return sink element representing termination of this stream.
 * 
 * @see #subscribe(TopologyElement, String, Class)
 */
public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
  return stream.sink(new Publish<>(topic, streamType));
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Publish this stream to a topic.
 * This is a model that allows jobs to subscribe to 
 * streams published by other jobs.
 * 
 * @param <T> Tuple type
 * @param stream stream to publish
 * @param topic Topic to publish to.
 * @param streamType Type of objects on the stream.
 * @return sink element representing termination of this stream.
 * 
 * @see #subscribe(TopologyElement, String, Class)
 */
public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
  return stream.sink(new Publish<>(topic, streamType));
}

代码示例来源:origin: apache/incubator-edgent

private <T> TSink<T> sendBinary(TStream<T> stream, Function<T,byte[]> toPayload) {
  Objects.requireNonNull(stream, "stream");
  Objects.requireNonNull(toPayload, "toPayload");
  checkAddSender();
  return stream.sink(new WebSocketClientBinarySender<T>(connector, toPayload));
}

代码示例来源:origin: apache/incubator-edgent

private <T> TSink<T> sendText(TStream<T> stream, Function<T,String> toPayload) {
  Objects.requireNonNull(stream, "stream");
  Objects.requireNonNull(toPayload, "toPayload");
  checkAddSender();
  return stream.sink(new WebSocketClientSender<T>(connector, toPayload));
}

代码示例来源:origin: apache/incubator-edgent

public void completeAndValidate(boolean ordered, String msg, Topology t,
    TStream<String> s, MsgGenerator mgen, int secTimeout, String... expected)
    throws Exception {
  
  s = s.filter(tuple -> tuple.matches(mgen.pattern()));
  s.sink(tuple -> System.out.println(
      String.format("[%s][%s] rcvd: %s", t.getName(), simpleTS(), tuple)));
  super.completeAndValidate(ordered, msg, t, s, secTimeout, expected);
}

代码示例来源:origin: apache/incubator-edgent

private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
  
  if (echoCmds == null)
    echoCmds = PlumbingStreams.isolate(stream, true);
  else
    echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
  
  return stream.sink(discard());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void metricsEverywhereSimple() throws Exception {
  
  Topology t = newTopology();
  Graph g = t.graph();
  // Source
  TStream<Integer> d = integers(t, 1, 2, 3);
  d.sink(tuple -> System.out.print("."));
  
  // Insert counter metrics into all the topology streams 
  Metrics.counter(t);
  printGraph(g);
  
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  assertEquals(3, vertices.size());
  
  Collection<Edge> edges = g.getEdges();
  assertEquals(2, edges.size());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSink() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  
  List<String> sinked = new ArrayList<>();
  TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
  assertNotNull(terminal);
  assertSame(t, terminal.topology());
  assertSame(s, terminal.getFeed());
  TStream<String> s1 = s.filter(tuple -> true);
  Condition<Long> tc = t.getTester().tupleCount(s1, 3);
  complete(t, tc);
  
  assertEquals("a", sinked.get(0));
  assertEquals("b", sinked.get(1));
  assertEquals("c", sinked.get(2));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBasicRead() throws Exception {
  Topology t = this.newTopology("testBasicRead");
  
  populatePersonsTable(getPersonList());
  List<String> expected = expectedPersons(person->true, getPersonList());
  JdbcStreams db = new JdbcStreams(t,
      () -> getDataSource(DB_NAME),
      dataSource -> connect(dataSource));
  // Create a stream of Person from a stream of ids
  TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
  TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
  
  rcvd.sink(tuple -> System.out.println(
      String.format("%s rcvd: %s", t.getName(), tuple)));
  completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}

相关文章