[英]Contains factory methods for various types of pipeline sinks. Formally, a sink transform is one which has no output. A pipeline stage with a sink transform has the type SinkStage and accepts no downstream stages.

The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.


代码示例来源:origin: hazelcast/hazelcast-jet

 * Returns a sink that puts {@code Map.Entry}s it receives into the given
 * Hazelcast {@code IMap}.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the map you supply
 * and acquires a map with that name on the local cluster. If you supply a
 * map instance from another cluster, no error will be thrown to indicate
 * this.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
public static <K, V> Sink<Entry<K, V>> map(@Nonnull IMap<? super K, ? super V> map) {
  return map(map.getName());

代码示例来源:origin: hazelcast/hazelcast-jet

 * Convenience for {@link #logger(DistributedFunction)} with {@code
 * Object.toString()} as the {@code toStringFn}.
public static <T> Sink<T> logger() {
  return logger(Object::toString);

代码示例来源:origin: hazelcast/hazelcast-jet

 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the list you
 * supply and acquires a list with that name on the local cluster. If you
 * supply a list instance from another cluster, no error will be thrown to
 * indicate this.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
public static <T> Sink<T> list(@Nonnull IList<? super T> list) {
  return list(list.getName());

代码示例来源:origin: hazelcast/hazelcast-jet

   * Creates and returns the file {@link Sink} with the supplied components.
  public Sink<T> build() {
    return Sinks.fromProcessor("filesSink(" + directoryName + ')',
        writeFileP(directoryName, toStringFn, charset, append));

代码示例来源:origin: hazelcast/hazelcast-jet-demos

.map((TimestampedEntry<String, Double> e) ->
        entry(new TrendKey(e.getKey(), e.getTimestamp()), e.getValue()))
return pipeline;

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetInstance localJet = Jet.newJetInstance();
  try {
    HazelcastInstance externalHz = startExternalHazelcast();
    IMap<Integer, Integer> sourceMap = externalHz.getMap(MAP_1);
    for (int i = 0; i < ITEM_COUNT; i++) {
      sourceMap.put(i, i);
    ClientConfig clientConfig = clientConfigForExternalHazelcast();
    // pipeline that copies the remote map to a local with the same name
    Pipeline p1 = Pipeline.create();
    p1.drawFrom(Sources.remoteMap(MAP_1, clientConfig))
    // pipeline that copies the local map to a remote with different name
    Pipeline p2 = Pipeline.create();
     .drainTo(Sinks.remoteMap(MAP_2, clientConfig));
    System.out.println("Local map-1 contents: " + localJet.getMap(MAP_1).entrySet());
    System.out.println("Remote map-2 contents: " + externalHz.getMap(MAP_2).entrySet());
  } finally {

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetInstance localJet = Jet.newJetInstance();
  try {
    HazelcastInstance externalHz = startExternalHazelcast();
    IList<Integer> sourceList = externalHz.getList(LIST_1);
    for (int i = 0; i < ITEM_COUNT; i++) {
    ClientConfig clientConfig = clientConfigForExternalHazelcast();
    // pipeline that copies the remote list to a local with the same name
    Pipeline p1 = Pipeline.create();
    p1.drawFrom(Sources.remoteList(LIST_1, clientConfig))
    // pipeline that copies the local list to a remote with a different name
    Pipeline p2 = Pipeline.create();
     .drainTo(Sinks.remoteList(LIST_2, clientConfig));
    System.out.println("Local list-1 contents: " + new ArrayList<>(localJet.getList(LIST_1)));
    System.out.println("Remote list-2 contents: " + new ArrayList<>(externalHz.getList(LIST_2)));
  } finally {

代码示例来源:origin: hazelcast/hazelcast-jet

return mapWithEntryProcessor(map.getName(), toKeyFn, toEntryProcessorFn);

代码示例来源:origin: hazelcast/hazelcast-jet

 * Convenience for {@link #mapWithUpdating(String, DistributedFunction,
 * DistributedBiFunction)} with {@link Entry} as the input item.
public static <K, V, E extends Entry<K, V>> Sink<E> mapWithUpdating(
    @Nonnull String mapName,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("mapWithUpdatingSink(" + mapName + ')',
      updateMapP(mapName, (Entry<K, V> e) -> e.getKey(), updateFn));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

 * This will take the contents of source map and apply entry processor to
 * increment the values by 5.
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
              item -> new IncrementEntryProcessor(5)
  return pipeline;

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build(String bootstrapServers) {
  Properties properties = new Properties();
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Pipeline pipeline = Pipeline.create();
      .drawFrom(KafkaSources.kafka(properties, Constants.TOPIC_NAME_PRECIOUS))
  return pipeline;

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build() {
  Pipeline p = Pipeline.create();
  // Palladium and Platinum only
  p.drawFrom(Sources.<String, Object>mapJournal(
      Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
  ).map(e -> e.getKey() + "==" + e.getValue())
   .filter(str -> str.toLowerCase().startsWith("p"))
  return p;

代码示例来源:origin: hazelcast/hazelcast-jet

 * Convenience for {@link #remoteMapWithUpdating} with {@link Entry} as
 * input item.
public static <K, V, E extends Entry<K, V>> Sink<E> remoteMapWithUpdating(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
      updateRemoteMapP(mapName, clientConfig, (Entry<K, V> e) -> e.getKey(), updateFn));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");

    NettyServer nettyServer = new NettyServer(PORT, channel -> {
      for (int i; (i = COUNTER.getAndDecrement()) > 0; ) {
        channel.writeAndFlush(i + "\n");
    }, DistributedConsumer.noop());

    JetInstance jet = Jet.newJetInstance();

    try {
      Pipeline p = Pipeline.create();
      p.drawFrom(Sources.socket(HOST, PORT, UTF_8))


      System.out.println("Jet received " + jet.getList(SINK_NAME).size() + " items from the socket");
    } finally {


代码示例来源:origin: hazelcast/hazelcast-code-samples


代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static Pipeline buildPipeline(String sourceName, String sinkName) {
  Pattern pattern = Pattern.compile("\\W+");
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
      .flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
                  .filter(w -> !w.isEmpty()))
  return pipeline;

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
  return p;

代码示例来源:origin: hazelcast/hazelcast-jet

 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
public static <T> Sink<T> list(@Nonnull String listName) {
  return fromProcessor("listSink(" + listName + ')', writeListP(listName));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");
    JetInstance jet = Jet.newJetInstance();

    try {
      IList<Integer> inputList = jet.getList(INPUT_LIST);
      for (int i = 0; i < ITEM_COUNT; i++) {

      Pipeline p = Pipeline.create();
       .map(i -> "item" + i)


      IList<String> outputList = jet.getList(RESULT_LIST);
      System.out.println("Result list items: " + new ArrayList<>(outputList));
    } finally {

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pattern delimiter = Pattern.compile("\\W+");
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
   .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
   .filter(word -> !word.isEmpty())
  return p;
