org.apache.spark.api.java.JavaPairRDD.foreachPartition()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(3.2k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.foreachPartition()方法的一些代码示例,展示了JavaPairRDD.foreachPartition()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.foreachPartition()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:foreachPartition

JavaPairRDD.foreachPartition介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

@Override
public void publishAdditionalModelData(JavaSparkContext sparkContext,
                    PMML pmml,
                    JavaRDD<String> newData,
                    JavaRDD<String> pastData,
                    Path modelParentPath,
                    TopicProducer<String, String> modelUpdateTopic) {
 // Send item updates first, before users. That way, user-based endpoints like /recommend
 // may take longer to not return 404, but when they do, the result will be more complete.
 log.info("Sending item / Y data as model updates");
 String yPathString = AppPMMLUtils.getExtensionValue(pmml, "Y");
 JavaPairRDD<String,float[]> productRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, yPathString));
 String updateBroker = modelUpdateTopic.getUpdateBroker();
 String topic = modelUpdateTopic.getTopic();
 // For now, there is no use in sending known users for each item
 productRDD.foreachPartition(new EnqueueFeatureVecsFn("Y", updateBroker, topic));
 log.info("Sending user / X data as model updates");
 String xPathString = AppPMMLUtils.getExtensionValue(pmml, "X");
 JavaPairRDD<String,float[]> userRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, xPathString));
 if (noKnownItems) {
  userRDD.foreachPartition(new EnqueueFeatureVecsFn("X", updateBroker, topic));
 } else {
  log.info("Sending known item data with model updates");
  JavaRDD<String[]> allData =
    (pastData == null ? newData : newData.union(pastData)).map(MLFunctions.PARSE_FN);
  JavaPairRDD<String,Collection<String>> knownItems = knownsRDD(allData, true);
  userRDD.join(knownItems).foreachPartition(
    new EnqueueFeatureVecsAndKnownItemsFn("X", updateBroker, topic));
 }
}

代码示例来源:origin: apache/tinkerpop

assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
    .foreachPartition(partitionIterator -> {
      KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);

代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform

sessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
  @Override
  public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> tuple2Iterator) throws Exception {

代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform

sessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
  @Override
  public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> tuple2Iterator) throws Exception {

代码示例来源:origin: ai.grakn/grakn-kb

assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());}
newViewIncomingRDD
    .foreachPartition(partitionIterator -> {
      KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);

代码示例来源:origin: org.apache.tinkerpop/spark-gremlin

assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
    .foreachPartition(partitionIterator -> {
      KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法