
x33g5p2x  于2022-01-30 转载在 其他  



[英]Sink (terminate) this stream using a function. For each tuple t on this stream Consumer#accept(Object) will be called. This is typically used to send information to external systems, such as databases or dashboards.

If sinker implements AutoCloseable, its close()method will be called when the topology's execution is terminated.

Example of terminating a stream of String tuples by printing them to System.out.

TStream<String> values = ... 
values.sink(t -> System.out.println(tuple));


TStream<String> values = ... 
values.sink(t -> System.out.println(tuple));


代码示例来源:origin: apache/incubator-edgent

public Condition<Long> tupleCount(TStream<?> stream, final long expectedCount) {
  AtomicLong count = new AtomicLong();
  stream.sink(t -> {
  return new Condition<Long>() {
    public boolean valid() {
      return count.get() == expectedCount;
    public Long getResult() {
      return count.get();

代码示例来源:origin: org.apache.edgent/edgent-providers-direct

public Condition<Long> tupleCount(TStream<?> stream, final long expectedCount) {
  AtomicLong count = new AtomicLong();
  stream.sink(t -> {
  return new Condition<Long>() {
    public boolean valid() {
      return count.get() == expectedCount;
    public Long getResult() {
      return count.get();

代码示例来源:origin: apache/incubator-edgent

public <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values) {
  List<T> contents = Collections.synchronizedList(new ArrayList<>());
  stream.sink(t -> contents.add(t));
  return new Condition<List<T>>() {
    public boolean valid() {
      synchronized (contents) {
        return Arrays.asList(values).equals(contents);
    public List<T> getResult() {
      return contents;

代码示例来源:origin: apache/incubator-edgent

 * Publish the stream of tuples to the specified queue.
 * @param stream The stream to publish
 * @param queue The specified queue of RabbitMQ
 * @param msgFn A function that yields the byte[] records from the tuple
 * @param <T> Tuple type
 * @return {@link TSink}
public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
  return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(new IotpGWEventsFunction(connector, eventId, payload, qos));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> fqDeviceId,
  TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(new IotpGWDeviceEventsFunction(connector, fqDeviceId, eventId, payload, qos));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
 return stream.sink(new IotpGWEventsFixed(connector, eventId, qos));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> eventsForDevice(String fqDeviceId, TStream<JsonObject> stream,
  String eventId, int qos) {
 return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
  UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
 return stream.sink(
   new IotpGWDeviceEventsFunction(connector, jo -> fqDeviceId, eventId, payload, qos));

代码示例来源:origin: apache/incubator-edgent

public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
 return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos));

代码示例来源:origin: apache/incubator-edgent

 * Publish a stream's tuples as device events using the WIoTP HTTP protocol.
 * <p>
 * Each tuple is published as a device event with the fixed event identifier.
 * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
 * @param stream
 *            Stream to be published.
 * @param eventId
 *            Event identifier.
 * @return TSink sink element representing termination of this stream.
public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, String eventId) {
 return stream.sink(new IotpDeviceHttpEventsFixed(connector, eventId));

代码示例来源:origin: org.apache.edgent/edgent-connectors-pubsub

 * Publish this stream to a topic.
 * This is a model that allows jobs to subscribe to 
 * streams published by other jobs.
 * @param <T> Tuple type
 * @param stream stream to publish
 * @param topic Topic to publish to.
 * @param streamType Type of objects on the stream.
 * @return sink element representing termination of this stream.
 * @see #subscribe(TopologyElement, String, Class)
public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
  return stream.sink(new Publish<>(topic, streamType));

代码示例来源:origin: apache/incubator-edgent

 * Publish this stream to a topic.
 * This is a model that allows jobs to subscribe to 
 * streams published by other jobs.
 * @param <T> Tuple type
 * @param stream stream to publish
 * @param topic Topic to publish to.
 * @param streamType Type of objects on the stream.
 * @return sink element representing termination of this stream.
 * @see #subscribe(TopologyElement, String, Class)
public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
  return stream.sink(new Publish<>(topic, streamType));

代码示例来源:origin: apache/incubator-edgent

private <T> TSink<T> sendBinary(TStream<T> stream, Function<T,byte[]> toPayload) {
  Objects.requireNonNull(stream, "stream");
  Objects.requireNonNull(toPayload, "toPayload");
  return stream.sink(new WebSocketClientBinarySender<T>(connector, toPayload));

代码示例来源:origin: apache/incubator-edgent

private <T> TSink<T> sendText(TStream<T> stream, Function<T,String> toPayload) {
  Objects.requireNonNull(stream, "stream");
  Objects.requireNonNull(toPayload, "toPayload");
  return stream.sink(new WebSocketClientSender<T>(connector, toPayload));

代码示例来源:origin: apache/incubator-edgent

public void completeAndValidate(boolean ordered, String msg, Topology t,
    TStream<String> s, MsgGenerator mgen, int secTimeout, String... expected)
    throws Exception {
  s = s.filter(tuple -> tuple.matches(mgen.pattern()));
  s.sink(tuple -> System.out.println(
      String.format("[%s][%s] rcvd: %s", t.getName(), simpleTS(), tuple)));
  super.completeAndValidate(ordered, msg, t, s, secTimeout, expected);

代码示例来源:origin: apache/incubator-edgent

private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
  if (echoCmds == null)
    echoCmds = PlumbingStreams.isolate(stream, true);
    echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
  return stream.sink(discard());

代码示例来源:origin: apache/incubator-edgent

public void metricsEverywhereSimple() throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  // Source
  TStream<Integer> d = integers(t, 1, 2, 3);
  d.sink(tuple -> System.out.print("."));
  // Insert counter metrics into all the topology streams 
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  assertEquals(3, vertices.size());
  Collection<Edge> edges = g.getEdges();
  assertEquals(2, edges.size());

代码示例来源:origin: apache/incubator-edgent

public void testSink() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> sinked = new ArrayList<>();
  TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
  assertSame(t, terminal.topology());
  assertSame(s, terminal.getFeed());
  TStream<String> s1 = s.filter(tuple -> true);
  Condition<Long> tc = t.getTester().tupleCount(s1, 3);
  complete(t, tc);
  assertEquals("a", sinked.get(0));
  assertEquals("b", sinked.get(1));
  assertEquals("c", sinked.get(2));

代码示例来源:origin: apache/incubator-edgent

public void testBasicRead() throws Exception {
  Topology t = this.newTopology("testBasicRead");
  List<String> expected = expectedPersons(person->true, getPersonList());
  JdbcStreams db = new JdbcStreams(t,
      () -> getDataSource(DB_NAME),
      dataSource -> connect(dataSource));
  // Create a stream of Person from a stream of ids
  TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
  TStream<String> rcvd = -> person.toString());
  rcvd.sink(tuple -> System.out.println(
      String.format("%s rcvd: %s", t.getName(), tuple)));
  completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
