com.hazelcast.jet.pipeline.Sinks.fromProcessor()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中com.hazelcast.jet.pipeline.Sinks.fromProcessor()方法的一些代码示例,展示了Sinks.fromProcessor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.fromProcessor()方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:fromProcessor

Sinks.fromProcessor介绍

[英]Returns a sink constructed directly from the given Core API processor meta-supplier.

The default local parallelism for this source is specified inside the ProcessorMetaSupplier#preferredLocalParallelism().
[中]返回直接从给定核心API处理器元供应商构造的接收器。
此源的默认本地并行性在ProcessorMetaSupplier#preferredLocalParallelism()中指定。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

/**
   * Creates and returns the file {@link Sink} with the supplied components.
   */
  public Sink<T> build() {
    return Sinks.fromProcessor("filesSink(" + directoryName + ')',
        writeFileP(directoryName, toStringFn, charset, append));
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #mapWithUpdating(String, DistributedFunction,
 * DistributedBiFunction)} with {@link Entry} as the input item.
 */
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> mapWithUpdating(
    @Nonnull String mapName,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("mapWithUpdatingSink(" + mapName + ')',
      updateMapP(mapName, (Entry<K, V> e) -> e.getKey(), updateFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #remoteMapWithUpdating} with {@link Entry} as
 * input item.
 */
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> remoteMapWithUpdating(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
  //noinspection Convert2MethodRef (provokes a javac 9 bug)
  return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
      updateRemoteMapP(mapName, clientConfig, (Entry<K, V> e) -> e.getKey(), updateFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> list(@Nonnull String listName) {
  return fromProcessor("listSink(" + listName + ')', writeListP(listName));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
 * {@code IMap} with the specified name.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <K, V> Sink<Entry<K, V>> map(@Nonnull String mapName) {
  return fromProcessor("mapSink(" + mapName + ')', writeMapP(mapName));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name in a remote cluster identified by the
 * supplied {@code ClientConfig}.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
  return fromProcessor("remoteListSink(" + listName + ')', writeRemoteListP(listName, clientConfig));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink equivalent to {@link #mapWithEntryProcessor}, but for a map
 * in a remote Hazelcast cluster identified by the supplied {@code
 * ClientConfig}.
 */
@Nonnull
public static <E, K, V> Sink<E> remoteMapWithEntryProcessor(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedFunction<? super E, ? extends K> toKeyFn,
    @Nonnull DistributedFunction<? super E, ? extends EntryProcessor<K, V>> toEntryProcessorFn
) {
  return fromProcessor("remoteMapWithEntryProcessorSink(" + mapName + ')',
      updateRemoteMapP(mapName, clientConfig, toKeyFn, toEntryProcessorFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
 * {@code ICache} with the specified name.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T extends Entry> Sink<T> cache(@Nonnull String cacheName) {
  return fromProcessor("cacheSink(" + cacheName + ')', writeCacheP(cacheName));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
 * {@code IMap} with the specified name in a remote cluster identified by
 * the supplied {@code ClientConfig}.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <K, V> Sink<Entry<K, V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
  return fromProcessor("remoteMapSink(" + mapName + ')', writeRemoteMapP(mapName, clientConfig));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink equivalent to {@link #mapWithUpdating}, but for a map
 * in a remote Hazelcast cluster identified by the supplied {@code
 * ClientConfig}.
 * <p>
 * Due to the used API, the remote cluster must be at least 3.11.
 */
@Nonnull
public static <T, K, V> Sink<T> remoteMapWithUpdating(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn,
    @Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn
) {
  return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
      updateRemoteMapP(mapName, clientConfig, toKeyFn, updateFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #mapWithMerging(String, DistributedFunction, DistributedFunction,
 * DistributedBinaryOperator)} with {@link Entry} as input item.
 */
@Nonnull
public static <K, V> Sink<Entry<K, V>> mapWithMerging(
    @Nonnull String mapName,
    @Nonnull DistributedBinaryOperator<? super V> mergeFn
) {
  return fromProcessor("mapWithMergingSink(" + mapName + ')',
      mergeMapP(mapName, Entry::getKey, entryValue(), mergeFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink equivalent to {@link #mapWithMerging(String, DistributedBinaryOperator)},
 * but for a map in a remote Hazelcast cluster identified by the supplied
 * {@code ClientConfig}.
 * <p>
 * Due to the used API, the remote cluster must be at least 3.11.
 */
@Nonnull
public static <T, K, V> Sink<T> remoteMapWithMerging(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn,
    @Nonnull DistributedFunction<? super T, ? extends V> toValueFn,
    @Nonnull DistributedBinaryOperator<V> mergeFn
) {
  return fromProcessor("remoteMapWithMergingSink(" + mapName + ')',
      mergeRemoteMapP(mapName, clientConfig, toKeyFn, toValueFn, mergeFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
 * {@code ICache} with the specified name in a remote cluster identified by
 * the supplied {@code ClientConfig}.
 * <p>
 * This sink provides the exactly-once guarantee thanks to <i>idempotent
 * updates</i>. It means that the value with the same key is not appended,
 * but overwritten. After the job is restarted from snapshot, duplicate
 * items will not change the state in the target map.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T extends Entry> Sink<T> remoteCache(
    @Nonnull String cacheName,
    @Nonnull ClientConfig clientConfig
) {
  return fromProcessor("remoteCacheSink(" + cacheName + ')', writeRemoteCacheP(cacheName, clientConfig));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that logs all the data items it receives, at the INFO
 * level to the log category {@link
 * com.hazelcast.jet.impl.connector.WriteLoggerP}. It also logs {@link
 * com.hazelcast.jet.core.Watermark watermark} items, but at FINE level.
 * <p>
 * The sink logs each item on whichever cluster member it happens to
 * receive it. Its primary purpose is for development, when running Jet on
 * a local machine.
 * <p>
 * The default local parallelism for this sink is 1.
 *
 * @param toStringFn a function that returns a string representation of a stream item
 * @param <T> stream item type
 */
@Nonnull
public static <T> Sink<T> logger(@Nonnull DistributedFunction<? super T, String> toStringFn) {
  return fromProcessor("loggerSink", writeLoggerP(toStringFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #remoteMapWithMerging} with {@link Entry} as
 * input item.
 */
@Nonnull
public static <K, V> Sink<Entry<K, V>> remoteMapWithMerging(
    @Nonnull String mapName,
    @Nonnull ClientConfig clientConfig,
    @Nonnull DistributedBinaryOperator<V> mergeFn
) {
  return fromProcessor("remoteMapWithMergingSink(" + mapName + ')',
      mergeRemoteMapP(mapName, clientConfig, Entry::getKey, entryValue(), mergeFn));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that connects to the specified TCP socket and writes to
 * it a string representation of the items it receives. It converts an
 * item to its string representation using the supplied {@code toStringFn}
 * function and encodes the string using the supplied {@code Charset}. It
 * follows each item with a newline character.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> socket(
    @Nonnull String host,
    int port,
    @Nonnull DistributedFunction<? super T, ? extends String> toStringFn,
    @Nonnull Charset charset
) {
  return fromProcessor("socketSink(" + host + ':' + port + ')', writeSocketP(host, port, toStringFn, charset));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink which discards all received items.
 */
@Nonnull
public static <T> Sink<T> noop() {
  return fromProcessor("noop", preferLocalParallelismOne(noopP()));
}

代码示例来源:origin: hazelcast/hazelcast-jet

/**
   * Convenience for {@link #kafka(Properties, DistributedFunction)} which creates
   * a {@code ProducerRecord} using the given topic and the given key and value
   * mapping functions
   *
   * @param <E> type of stream item
   * @param <K> type of the key published to Kafka
   * @param <V> type of the value published to Kafka
   * @param properties     producer properties which should contain broker
*                       address and key/value serializers
   * @param topic          name of the Kafka topic to publish to
   * @param extractKeyFn   function that extracts the key from the stream item
   * @param extractValueFn function that extracts the value from the stream item
*
   */
  @Nonnull
  public static <E, K, V> Sink<E> kafka(
      @Nonnull Properties properties,
      @Nonnull String topic,
      @Nonnull DistributedFunction<? super E, K> extractKeyFn,
      @Nonnull DistributedFunction<? super E, V> extractValueFn
  ) {
    return Sinks.fromProcessor("writeKafka", writeKafkaP(properties, topic, extractKeyFn, extractValueFn));
  }

代码示例来源:origin: com.hazelcast.jet/hazelcast-jet-kafka

/**
   * Convenience for {@link #kafka(Properties, DistributedFunction)} which creates
   * a {@code ProducerRecord} using the given topic and the given key and value
   * mapping functions
   *
   * @param <E> type of stream item
   * @param <K> type of the key published to Kafka
   * @param <V> type of the value published to Kafka
   * @param properties     producer properties which should contain broker
*                       address and key/value serializers
   * @param topic          name of the Kafka topic to publish to
   * @param extractKeyFn   function that extracts the key from the stream item
   * @param extractValueFn function that extracts the value from the stream item
*
   */
  @Nonnull
  public static <E, K, V> Sink<E> kafka(
      @Nonnull Properties properties,
      @Nonnull String topic,
      @Nonnull DistributedFunction<? super E, K> extractKeyFn,
      @Nonnull DistributedFunction<? super E, V> extractValueFn
  ) {
    return Sinks.fromProcessor("writeKafka", writeKafkaP(properties, topic, extractKeyFn, extractValueFn));
  }

代码示例来源:origin: hazelcast/hazelcast-jet

@Test(timeout = 20000)
public void test() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.batchFromProcessor("source", preferLocalParallelismOne(CustomSourceP::new)))
   .drainTo(Sinks.fromProcessor("sink", preferLocalParallelismOne(CustomSinkP::new)));
  jetInstance.newJob(p).join();
}

相关文章