com.hazelcast.jet.pipeline.Sinks.mapWithEntryProcessor()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(2.8k)|赞(0)|评价(0)|浏览(121)

本文整理了Java中com.hazelcast.jet.pipeline.Sinks.mapWithEntryProcessor()方法的一些代码示例,展示了Sinks.mapWithEntryProcessor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.mapWithEntryProcessor()方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:mapWithEntryProcessor

Sinks.mapWithEntryProcessor介绍

[英]Returns a sink that uses the items it receives to create EntryProcessors it submits to a Hazelcast IMap with the specified name. For each received item it applies toKeyFn to get the key and toEntryProcessorFn to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.

NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

As opposed to #mapWithUpdating and #mapWithMerging, this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.

If your entry processors take a long time to update a value, consider using entry processors that implement Offloadable. This will avoid blocking the Hazelcast partition thread during large update operations.

This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.

Note: Unlike #mapWithUpdating and #mapWithMerging, this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.

The default local parallelism for this sink is 1.
[中]返回一个接收器,该接收器使用它接收到的项创建它提交给具有指定名称的Hazelcast IMap的EntryProcessors。对于每个收到的项目,它应用toKeyFn获取密钥,应用toEntryProcessorFn获取条目处理器,然后将密钥和条目处理器提交给Hazelcast集群,该集群将在内部将条目处理器应用于密钥。
注意:Jet只记住您提供的地图的名称,并在本地集群上获取具有该名称的地图。如果您从另一个集群提供一个映射实例,则不会抛出任何错误来指示这一点。
与#MapWithUpdate和#mapWithMerging不同,该接收器不使用批处理,并为每个接收到的项目提交一个单独的条目处理器。对于使用这些接收器可以有效解决的用例,这个用例的性能会更差。只有在不适用时才应使用。
如果您的条目处理器花了很长时间来更新值,请考虑使用实现可卸载的条目处理器。这将避免在大型更新操作期间阻塞Hazelcast分区线程。
仅当提供的条目处理器执行幂等更新时,此接收器才支持一次处理,即,如果条目处理器在同一条目上多次运行,则结果值将相同。
注意:与#MapWithUpdate和#mapWithMerging不同,此操作*具有锁感知功能。如果钥匙被锁定,EntryProcessor将等待获得锁。
此接收器的默认本地并行度为1。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

return mapWithEntryProcessor(map.getName(), toKeyFn, toEntryProcessorFn);

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map and apply entry processor to
 * increment the values by 5.
 */
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
      .drainTo(
          Sinks.mapWithEntryProcessor(
              sinkMapName,
              entryKey(),
              item -> new IncrementEntryProcessor(5)
          )
      );
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-code-samples

Sinks.mapWithEntryProcessor(
Constants.IMAP_NAME_SEQUENCE,
  DistributedFunctions.wholeItem(),

相关文章