com.hazelcast.jet.pipeline.Sinks类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(246)

本文整理了Java中com.hazelcast.jet.pipeline.Sinks类的一些代码示例,展示了Sinks类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks类的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks

Sinks介绍

[英]Contains factory methods for various types of pipeline sinks. Formally, a sink transform is one which has no output. A pipeline stage with a sink transform has the type SinkStage and accepts no downstream stages.

The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
[中]包含各种类型管道接收器的工厂方法。从形式上讲,sink变换是一种没有输出的变换。带有sink转换的管道级具有SinkStage类型,不接受下游级。
此类中接收器的默认本地并行度通常为1,请查看各个方法的文档。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that puts {@code Map.Entry}s it receives into the given
 * Hazelcast {@code IMap}.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the map you supply
 * and acquires a map with that name on the local cluster. If you supply a
 * map instance from another cluster, no error will be thrown to indicate
 * this.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <K, V> Sink<Entry<K, V>> map(@Nonnull IMap<? super K, ? super V> map) {
  return map(map.getName());
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #logger(DistributedFunction)} with {@code
 * Object.toString()} as the {@code toStringFn}.
 */
@Nonnull
public static <T> Sink<T> logger() {
  return logger(Object::toString);
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the list you
 * supply and acquires a list with that name on the local cluster. If you
 * supply a list instance from another cluster, no error will be thrown to
 * indicate this.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> list(@Nonnull IList<? super T> list) {
  return list(list.getName());
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
   * Creates and returns the file {@link Sink} with the supplied components.
   */
  public Sink<T> build() {
    return Sinks.fromProcessor("filesSink(" + directoryName + ')',
        writeFileP(directoryName, toStringFn, charset, append));
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-demos

.map((TimestampedEntry<String, Double> e) ->
        entry(new TrendKey(e.getKey(), e.getTimestamp()), e.getValue()))
    .drainTo(Sinks.map("trends"));
    .drainTo(Sinks.files(targetDirectory));
return pipeline;

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetInstance localJet = Jet.newJetInstance();
  try {
    HazelcastInstance externalHz = startExternalHazelcast();
    IMap<Integer, Integer> sourceMap = externalHz.getMap(MAP_1);
    for (int i = 0; i < ITEM_COUNT; i++) {
      sourceMap.put(i, i);
    }
    ClientConfig clientConfig = clientConfigForExternalHazelcast();
    // pipeline that copies the remote map to a local with the same name
    Pipeline p1 = Pipeline.create();
    p1.drawFrom(Sources.remoteMap(MAP_1, clientConfig))
     .drainTo(Sinks.map(MAP_1));
    // pipeline that copies the local map to a remote with different name
    Pipeline p2 = Pipeline.create();
    p2.drawFrom(Sources.map(MAP_1))
     .drainTo(Sinks.remoteMap(MAP_2, clientConfig));
    localJet.newJob(p1).join();
    System.out.println("Local map-1 contents: " + localJet.getMap(MAP_1).entrySet());
    localJet.newJob(p2).join();
    System.out.println("Remote map-2 contents: " + externalHz.getMap(MAP_2).entrySet());
  } finally {
    Jet.shutdownAll();
    Hazelcast.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetInstance localJet = Jet.newJetInstance();
  try {
    HazelcastInstance externalHz = startExternalHazelcast();
    IList<Integer> sourceList = externalHz.getList(LIST_1);
    for (int i = 0; i < ITEM_COUNT; i++) {
      sourceList.add(i);
    }
    ClientConfig clientConfig = clientConfigForExternalHazelcast();
    // pipeline that copies the remote list to a local with the same name
    Pipeline p1 = Pipeline.create();
    p1.drawFrom(Sources.remoteList(LIST_1, clientConfig))
     .drainTo(Sinks.list(LIST_1));
    // pipeline that copies the local list to a remote with a different name
    Pipeline p2 = Pipeline.create();
    p2.drawFrom(Sources.list(LIST_1))
     .drainTo(Sinks.remoteList(LIST_2, clientConfig));
    localJet.newJob(p1).join();
    System.out.println("Local list-1 contents: " + new ArrayList<>(localJet.getList(LIST_1)));
    localJet.newJob(p2).join();
    System.out.println("Remote list-2 contents: " + new ArrayList<>(externalHz.getList(LIST_2)));
  } finally {
    Jet.shutdownAll();
    Hazelcast.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet

return mapWithEntryProcessor(map.getName(), toKeyFn, toEntryProcessorFn);

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #mapWithUpdating(String, DistributedFunction,
 * DistributedBiFunction)} with {@link Entry} as the input item.
 */
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> mapWithUpdating(
    @Nonnull String mapName,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("mapWithUpdatingSink(" + mapName + ')',
      updateMapP(mapName, (Entry<K, V> e) -> e.getKey(), updateFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map and apply entry processor to
 * increment the values by 5.
 */
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
      .drainTo(
          Sinks.mapWithEntryProcessor(
              sinkMapName,
              entryKey(),
              item -> new IncrementEntryProcessor(5)
          )
      );
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build(String bootstrapServers) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Pipeline pipeline = Pipeline.create();
  pipeline
      .drawFrom(KafkaSources.kafka(properties, Constants.TOPIC_NAME_PRECIOUS))
      .drainTo(Sinks.map(Constants.IMAP_NAME_PRECIOUS));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build() {
  Pipeline p = Pipeline.create();
  // Palladium and Platinum only
  p.drawFrom(Sources.<String, Object>mapJournal(
      Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
  ).map(e -> e.getKey() + "==" + e.getValue())
   .filter(str -> str.toLowerCase().startsWith("p"))
   .drainTo(Sinks.logger())
  ;
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #remoteMapWithUpdating} with {@link Entry} as
 * input item.
 */
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> remoteMapWithUpdating(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
      updateRemoteMapP(mapName, clientConfig, (Entry<K, V> e) -> e.getKey(), updateFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");

    NettyServer nettyServer = new NettyServer(PORT, channel -> {
      for (int i; (i = COUNTER.getAndDecrement()) > 0; ) {
        channel.writeAndFlush(i + "\n");
      }
      channel.close();
    }, DistributedConsumer.noop());
    nettyServer.start();

    JetInstance jet = Jet.newJetInstance();
    Jet.newJetInstance();

    try {
      Pipeline p = Pipeline.create();
      p.drawFrom(Sources.socket(HOST, PORT, UTF_8))
       .drainTo(Sinks.list(SINK_NAME));

      jet.newJob(p).join();

      System.out.println("Jet received " + jet.getList(SINK_NAME).size() + " items from the socket");
    } finally {
      nettyServer.stop();
      Jet.shutdownAll();
    }

  }
}

代码示例来源:origin: hazelcast/hazelcast-code-samples

Sinks.mapWithEntryProcessor(
Constants.IMAP_NAME_SEQUENCE,
  DistributedFunctions.wholeItem(),

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static Pipeline buildPipeline(String sourceName, String sinkName) {
  Pattern pattern = Pattern.compile("\\W+");
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
      .flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
                  .filter(w -> !w.isEmpty()))
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.map(sinkName));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> list(@Nonnull String listName) {
  return fromProcessor("listSink(" + listName + ')', writeListP(listName));
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");
    JetInstance jet = Jet.newJetInstance();

    try {
      IList<Integer> inputList = jet.getList(INPUT_LIST);
      for (int i = 0; i < ITEM_COUNT; i++) {
        inputList.add(i);
      }

      Pipeline p = Pipeline.create();
      p.drawFrom(Sources.<Integer>list(INPUT_LIST))
       .map(i -> "item" + i)
       .drainTo(Sinks.list(RESULT_LIST));

      jet.newJob(p).join();

      IList<String> outputList = jet.getList(RESULT_LIST);
      System.out.println("Result list items: " + new ArrayList<>(outputList));
    } finally {
      Jet.shutdownAll();
    }
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pattern delimiter = Pattern.compile("\\W+");
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
   .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
   .filter(word -> !word.isEmpty())
   .groupingKey(wholeItem())
   .aggregate(counting())
   .drainTo(Sinks.map(COUNTS));
  return p;
}

相关文章