本文整理了Java中com.hazelcast.jet.pipeline.Sinks.fromProcessor()
方法的一些代码示例,展示了Sinks.fromProcessor()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.fromProcessor()
方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:fromProcessor
[英]Returns a sink constructed directly from the given Core API processor meta-supplier.
The default local parallelism for this source is specified inside the ProcessorMetaSupplier#preferredLocalParallelism().
[中]返回直接从给定核心API处理器元供应商构造的接收器。
此源的默认本地并行性在ProcessorMetaSupplier#preferredLocalParallelism()中指定。
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Creates and returns the file {@link Sink} with the supplied components.
*/
public Sink<T> build() {
return Sinks.fromProcessor("filesSink(" + directoryName + ')',
writeFileP(directoryName, toStringFn, charset, append));
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #mapWithUpdating(String, DistributedFunction,
* DistributedBiFunction)} with {@link Entry} as the input item.
*/
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> mapWithUpdating(
@Nonnull String mapName,
@Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
//noinspection Convert2MethodRef (provokes a javac 9 bug)
return fromProcessor("mapWithUpdatingSink(" + mapName + ')',
updateMapP(mapName, (Entry<K, V> e) -> e.getKey(), updateFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #remoteMapWithUpdating} with {@link Entry} as
* input item.
*/
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> remoteMapWithUpdating(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
//noinspection Convert2MethodRef (provokes a javac 9 bug)
return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
updateRemoteMapP(mapName, clientConfig, (Entry<K, V> e) -> e.getKey(), updateFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that adds the items it receives to a Hazelcast {@code
* IList} with the specified name.
* <p>
* No state is saved to snapshot for this sink. After the job is restarted,
* the items will likely be duplicated, providing an <i>at-least-once</i>
* guarantee.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T> Sink<T> list(@Nonnull String listName) {
return fromProcessor("listSink(" + listName + ')', writeListP(listName));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
* {@code IMap} with the specified name.
* <p>
* This sink provides the exactly-once guarantee thanks to <i>idempotent
* updates</i>. It means that the value with the same key is not appended,
* but overwritten. After the job is restarted from snapshot, duplicate
* items will not change the state in the target map.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <K, V> Sink<Entry<K, V>> map(@Nonnull String mapName) {
return fromProcessor("mapSink(" + mapName + ')', writeMapP(mapName));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that adds the items it receives to a Hazelcast {@code
* IList} with the specified name in a remote cluster identified by the
* supplied {@code ClientConfig}.
* <p>
* No state is saved to snapshot for this sink. After the job is restarted,
* the items will likely be duplicated, providing an <i>at-least-once</i>
* guarantee.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
return fromProcessor("remoteListSink(" + listName + ')', writeRemoteListP(listName, clientConfig));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink equivalent to {@link #mapWithEntryProcessor}, but for a map
* in a remote Hazelcast cluster identified by the supplied {@code
* ClientConfig}.
*/
@Nonnull
public static <E, K, V> Sink<E> remoteMapWithEntryProcessor(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedFunction<? super E, ? extends K> toKeyFn,
@Nonnull DistributedFunction<? super E, ? extends EntryProcessor<K, V>> toEntryProcessorFn
) {
return fromProcessor("remoteMapWithEntryProcessorSink(" + mapName + ')',
updateRemoteMapP(mapName, clientConfig, toKeyFn, toEntryProcessorFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
* {@code ICache} with the specified name.
* <p>
* This sink provides the exactly-once guarantee thanks to <i>idempotent
* updates</i>. It means that the value with the same key is not appended,
* but overwritten. After the job is restarted from snapshot, duplicate
* items will not change the state in the target map.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T extends Entry> Sink<T> cache(@Nonnull String cacheName) {
return fromProcessor("cacheSink(" + cacheName + ')', writeCacheP(cacheName));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
* {@code IMap} with the specified name in a remote cluster identified by
* the supplied {@code ClientConfig}.
* <p>
* This sink provides the exactly-once guarantee thanks to <i>idempotent
* updates</i>. It means that the value with the same key is not appended,
* but overwritten. After the job is restarted from snapshot, duplicate
* items will not change the state in the target map.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <K, V> Sink<Entry<K, V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
return fromProcessor("remoteMapSink(" + mapName + ')', writeRemoteMapP(mapName, clientConfig));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink equivalent to {@link #mapWithUpdating}, but for a map
* in a remote Hazelcast cluster identified by the supplied {@code
* ClientConfig}.
* <p>
* Due to the used API, the remote cluster must be at least 3.11.
*/
@Nonnull
public static <T, K, V> Sink<T> remoteMapWithUpdating(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedFunction<? super T, ? extends K> toKeyFn,
@Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn
) {
return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
updateRemoteMapP(mapName, clientConfig, toKeyFn, updateFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #mapWithMerging(String, DistributedFunction, DistributedFunction,
* DistributedBinaryOperator)} with {@link Entry} as input item.
*/
@Nonnull
public static <K, V> Sink<Entry<K, V>> mapWithMerging(
@Nonnull String mapName,
@Nonnull DistributedBinaryOperator<? super V> mergeFn
) {
return fromProcessor("mapWithMergingSink(" + mapName + ')',
mergeMapP(mapName, Entry::getKey, entryValue(), mergeFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink equivalent to {@link #mapWithMerging(String, DistributedBinaryOperator)},
* but for a map in a remote Hazelcast cluster identified by the supplied
* {@code ClientConfig}.
* <p>
* Due to the used API, the remote cluster must be at least 3.11.
*/
@Nonnull
public static <T, K, V> Sink<T> remoteMapWithMerging(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedFunction<? super T, ? extends K> toKeyFn,
@Nonnull DistributedFunction<? super T, ? extends V> toValueFn,
@Nonnull DistributedBinaryOperator<V> mergeFn
) {
return fromProcessor("remoteMapWithMergingSink(" + mapName + ')',
mergeRemoteMapP(mapName, clientConfig, toKeyFn, toValueFn, mergeFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that puts {@code Map.Entry}s it receives into a Hazelcast
* {@code ICache} with the specified name in a remote cluster identified by
* the supplied {@code ClientConfig}.
* <p>
* This sink provides the exactly-once guarantee thanks to <i>idempotent
* updates</i>. It means that the value with the same key is not appended,
* but overwritten. After the job is restarted from snapshot, duplicate
* items will not change the state in the target map.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T extends Entry> Sink<T> remoteCache(
@Nonnull String cacheName,
@Nonnull ClientConfig clientConfig
) {
return fromProcessor("remoteCacheSink(" + cacheName + ')', writeRemoteCacheP(cacheName, clientConfig));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that logs all the data items it receives, at the INFO
* level to the log category {@link
* com.hazelcast.jet.impl.connector.WriteLoggerP}. It also logs {@link
* com.hazelcast.jet.core.Watermark watermark} items, but at FINE level.
* <p>
* The sink logs each item on whichever cluster member it happens to
* receive it. Its primary purpose is for development, when running Jet on
* a local machine.
* <p>
* The default local parallelism for this sink is 1.
*
* @param toStringFn a function that returns a string representation of a stream item
* @param <T> stream item type
*/
@Nonnull
public static <T> Sink<T> logger(@Nonnull DistributedFunction<? super T, String> toStringFn) {
return fromProcessor("loggerSink", writeLoggerP(toStringFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #remoteMapWithMerging} with {@link Entry} as
* input item.
*/
@Nonnull
public static <K, V> Sink<Entry<K, V>> remoteMapWithMerging(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedBinaryOperator<V> mergeFn
) {
return fromProcessor("remoteMapWithMergingSink(" + mapName + ')',
mergeRemoteMapP(mapName, clientConfig, Entry::getKey, entryValue(), mergeFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that connects to the specified TCP socket and writes to
* it a string representation of the items it receives. It converts an
* item to its string representation using the supplied {@code toStringFn}
* function and encodes the string using the supplied {@code Charset}. It
* follows each item with a newline character.
* <p>
* No state is saved to snapshot for this sink. After the job is restarted,
* the items will likely be duplicated, providing an <i>at-least-once</i>
* guarantee.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T> Sink<T> socket(
@Nonnull String host,
int port,
@Nonnull DistributedFunction<? super T, ? extends String> toStringFn,
@Nonnull Charset charset
) {
return fromProcessor("socketSink(" + host + ':' + port + ')', writeSocketP(host, port, toStringFn, charset));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink which discards all received items.
*/
@Nonnull
public static <T> Sink<T> noop() {
return fromProcessor("noop", preferLocalParallelismOne(noopP()));
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #kafka(Properties, DistributedFunction)} which creates
* a {@code ProducerRecord} using the given topic and the given key and value
* mapping functions
*
* @param <E> type of stream item
* @param <K> type of the key published to Kafka
* @param <V> type of the value published to Kafka
* @param properties producer properties which should contain broker
* address and key/value serializers
* @param topic name of the Kafka topic to publish to
* @param extractKeyFn function that extracts the key from the stream item
* @param extractValueFn function that extracts the value from the stream item
*
*/
@Nonnull
public static <E, K, V> Sink<E> kafka(
@Nonnull Properties properties,
@Nonnull String topic,
@Nonnull DistributedFunction<? super E, K> extractKeyFn,
@Nonnull DistributedFunction<? super E, V> extractValueFn
) {
return Sinks.fromProcessor("writeKafka", writeKafkaP(properties, topic, extractKeyFn, extractValueFn));
}
代码示例来源:origin: com.hazelcast.jet/hazelcast-jet-kafka
/**
* Convenience for {@link #kafka(Properties, DistributedFunction)} which creates
* a {@code ProducerRecord} using the given topic and the given key and value
* mapping functions
*
* @param <E> type of stream item
* @param <K> type of the key published to Kafka
* @param <V> type of the value published to Kafka
* @param properties producer properties which should contain broker
* address and key/value serializers
* @param topic name of the Kafka topic to publish to
* @param extractKeyFn function that extracts the key from the stream item
* @param extractValueFn function that extracts the value from the stream item
*
*/
@Nonnull
public static <E, K, V> Sink<E> kafka(
@Nonnull Properties properties,
@Nonnull String topic,
@Nonnull DistributedFunction<? super E, K> extractKeyFn,
@Nonnull DistributedFunction<? super E, V> extractValueFn
) {
return Sinks.fromProcessor("writeKafka", writeKafkaP(properties, topic, extractKeyFn, extractValueFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Test(timeout = 20000)
public void test() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.batchFromProcessor("source", preferLocalParallelismOne(CustomSourceP::new)))
.drainTo(Sinks.fromProcessor("sink", preferLocalParallelismOne(CustomSinkP::new)));
jetInstance.newJob(p).join();
}
内容来源于网络,如有侵权,请联系作者删除!