本文整理了Java中com.hazelcast.jet.pipeline.Sinks
类的一些代码示例,展示了Sinks
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks
类的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
[英]Contains factory methods for various types of pipeline sinks. Formally, a sink transform is one which has no output. A pipeline stage with a sink transform has the type SinkStage and accepts no downstream stages.
The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
[中]包含各种类型管道接收器的工厂方法。从形式上讲,sink变换是一种没有输出的变换。带有sink转换的管道级具有SinkStage类型,不接受下游级。
此类中接收器的默认本地并行度通常为1,请查看各个方法的文档。
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that puts {@code Map.Entry}s it receives into the given
* Hazelcast {@code IMap}.
* <p>
* <strong>NOTE:</strong> Jet only remembers the name of the map you supply
* and acquires a map with that name on the local cluster. If you supply a
* map instance from another cluster, no error will be thrown to indicate
* this.
* <p>
* This sink provides the exactly-once guarantee thanks to <i>idempotent
* updates</i>. It means that the value with the same key is not appended,
* but overwritten. After the job is restarted from snapshot, duplicate
* items will not change the state in the target map.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <K, V> Sink<Entry<K, V>> map(@Nonnull IMap<? super K, ? super V> map) {
return map(map.getName());
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #logger(DistributedFunction)} with {@code
* Object.toString()} as the {@code toStringFn}.
*/
@Nonnull
public static <T> Sink<T> logger() {
return logger(Object::toString);
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that adds the items it receives to a Hazelcast {@code
* IList} with the specified name.
* <p>
* <strong>NOTE:</strong> Jet only remembers the name of the list you
* supply and acquires a list with that name on the local cluster. If you
* supply a list instance from another cluster, no error will be thrown to
* indicate this.
* <p>
* No state is saved to snapshot for this sink. After the job is restarted,
* the items will likely be duplicated, providing an <i>at-least-once</i>
* guarantee.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T> Sink<T> list(@Nonnull IList<? super T> list) {
return list(list.getName());
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Creates and returns the file {@link Sink} with the supplied components.
*/
public Sink<T> build() {
return Sinks.fromProcessor("filesSink(" + directoryName + ')',
writeFileP(directoryName, toStringFn, charset, append));
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
.map((TimestampedEntry<String, Double> e) ->
entry(new TrendKey(e.getKey(), e.getTimestamp()), e.getValue()))
.drainTo(Sinks.map("trends"));
.drainTo(Sinks.files(targetDirectory));
return pipeline;
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) throws Exception {
System.setProperty("hazelcast.logging.type", "log4j");
JetInstance localJet = Jet.newJetInstance();
try {
HazelcastInstance externalHz = startExternalHazelcast();
IMap<Integer, Integer> sourceMap = externalHz.getMap(MAP_1);
for (int i = 0; i < ITEM_COUNT; i++) {
sourceMap.put(i, i);
}
ClientConfig clientConfig = clientConfigForExternalHazelcast();
// pipeline that copies the remote map to a local with the same name
Pipeline p1 = Pipeline.create();
p1.drawFrom(Sources.remoteMap(MAP_1, clientConfig))
.drainTo(Sinks.map(MAP_1));
// pipeline that copies the local map to a remote with different name
Pipeline p2 = Pipeline.create();
p2.drawFrom(Sources.map(MAP_1))
.drainTo(Sinks.remoteMap(MAP_2, clientConfig));
localJet.newJob(p1).join();
System.out.println("Local map-1 contents: " + localJet.getMap(MAP_1).entrySet());
localJet.newJob(p2).join();
System.out.println("Remote map-2 contents: " + externalHz.getMap(MAP_2).entrySet());
} finally {
Jet.shutdownAll();
Hazelcast.shutdownAll();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) throws Exception {
System.setProperty("hazelcast.logging.type", "log4j");
JetInstance localJet = Jet.newJetInstance();
try {
HazelcastInstance externalHz = startExternalHazelcast();
IList<Integer> sourceList = externalHz.getList(LIST_1);
for (int i = 0; i < ITEM_COUNT; i++) {
sourceList.add(i);
}
ClientConfig clientConfig = clientConfigForExternalHazelcast();
// pipeline that copies the remote list to a local with the same name
Pipeline p1 = Pipeline.create();
p1.drawFrom(Sources.remoteList(LIST_1, clientConfig))
.drainTo(Sinks.list(LIST_1));
// pipeline that copies the local list to a remote with a different name
Pipeline p2 = Pipeline.create();
p2.drawFrom(Sources.list(LIST_1))
.drainTo(Sinks.remoteList(LIST_2, clientConfig));
localJet.newJob(p1).join();
System.out.println("Local list-1 contents: " + new ArrayList<>(localJet.getList(LIST_1)));
localJet.newJob(p2).join();
System.out.println("Remote list-2 contents: " + new ArrayList<>(externalHz.getList(LIST_2)));
} finally {
Jet.shutdownAll();
Hazelcast.shutdownAll();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
return mapWithEntryProcessor(map.getName(), toKeyFn, toEntryProcessorFn);
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #mapWithUpdating(String, DistributedFunction,
* DistributedBiFunction)} with {@link Entry} as the input item.
*/
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> mapWithUpdating(
@Nonnull String mapName,
@Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
//noinspection Convert2MethodRef (provokes a javac 9 bug)
return fromProcessor("mapWithUpdatingSink(" + mapName + ')',
updateMapP(mapName, (Entry<K, V> e) -> e.getKey(), updateFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map and apply entry processor to
* increment the values by 5.
*/
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
.drainTo(
Sinks.mapWithEntryProcessor(
sinkMapName,
entryKey(),
item -> new IncrementEntryProcessor(5)
)
);
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Pipeline pipeline = Pipeline.create();
pipeline
.drawFrom(KafkaSources.kafka(properties, Constants.TOPIC_NAME_PRECIOUS))
.drainTo(Sinks.map(Constants.IMAP_NAME_PRECIOUS));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build() {
Pipeline p = Pipeline.create();
// Palladium and Platinum only
p.drawFrom(Sources.<String, Object>mapJournal(
Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
).map(e -> e.getKey() + "==" + e.getValue())
.filter(str -> str.toLowerCase().startsWith("p"))
.drainTo(Sinks.logger())
;
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #remoteMapWithUpdating} with {@link Entry} as
* input item.
*/
@Nonnull
public static <K, V, E extends Entry<K, V>> Sink<E> remoteMapWithUpdating(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull DistributedBiFunction<? super V, ? super E, ? extends V> updateFn
) {
//noinspection Convert2MethodRef (provokes a javac 9 bug)
return fromProcessor("remoteMapWithUpdatingSink(" + mapName + ')',
updateRemoteMapP(mapName, clientConfig, (Entry<K, V> e) -> e.getKey(), updateFn));
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) throws Exception {
System.setProperty("hazelcast.logging.type", "log4j");
NettyServer nettyServer = new NettyServer(PORT, channel -> {
for (int i; (i = COUNTER.getAndDecrement()) > 0; ) {
channel.writeAndFlush(i + "\n");
}
channel.close();
}, DistributedConsumer.noop());
nettyServer.start();
JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();
try {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.socket(HOST, PORT, UTF_8))
.drainTo(Sinks.list(SINK_NAME));
jet.newJob(p).join();
System.out.println("Jet received " + jet.getList(SINK_NAME).size() + " items from the socket");
} finally {
nettyServer.stop();
Jet.shutdownAll();
}
}
}
代码示例来源:origin: hazelcast/hazelcast-code-samples
Sinks.mapWithEntryProcessor(
Constants.IMAP_NAME_SEQUENCE,
DistributedFunctions.wholeItem(),
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static Pipeline buildPipeline(String sourceName, String sinkName) {
Pattern pattern = Pattern.compile("\\W+");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
.flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
.filter(w -> !w.isEmpty()))
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(sinkName));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a sink that adds the items it receives to a Hazelcast {@code
* IList} with the specified name.
* <p>
* No state is saved to snapshot for this sink. After the job is restarted,
* the items will likely be duplicated, providing an <i>at-least-once</i>
* guarantee.
* <p>
* The default local parallelism for this sink is 1.
*/
@Nonnull
public static <T> Sink<T> list(@Nonnull String listName) {
return fromProcessor("listSink(" + listName + ')', writeListP(listName));
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) throws Exception {
System.setProperty("hazelcast.logging.type", "log4j");
JetInstance jet = Jet.newJetInstance();
try {
IList<Integer> inputList = jet.getList(INPUT_LIST);
for (int i = 0; i < ITEM_COUNT; i++) {
inputList.add(i);
}
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer>list(INPUT_LIST))
.map(i -> "item" + i)
.drainTo(Sinks.list(RESULT_LIST));
jet.newJob(p).join();
IList<String> outputList = jet.getList(RESULT_LIST);
System.out.println("Result list items: " + new ArrayList<>(outputList));
} finally {
Jet.shutdownAll();
}
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
.flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(COUNTS));
return p;
}
内容来源于网络,如有侵权,请联系作者删除!