org.apache.spark.api.java.JavaRDD.foreachPartition()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(96)

本文整理了Java中org.apache.spark.api.java.JavaRDD.foreachPartition()方法的一些代码示例,展示了JavaRDD.foreachPartition()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.foreachPartition()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:foreachPartition

JavaRDD.foreachPartition介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

newData.foreachPartition(p -> {});
pastData.foreachPartition(p -> {});

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void foreachPartition() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreachPartition(iter -> {
  while (iter.hasNext()) {
   iter.next();
   accum.add(1);
  }
 });
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void foreachPartition() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreachPartition(iter -> {
  while (iter.hasNext()) {
   iter.next();
   accum.add(1);
  }
 });
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void foreachPartition() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreachPartition(iter -> {
  while (iter.hasNext()) {
   iter.next();
   accum.add(1);
  }
 });
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: dibbhatt/kafka-spark-consumer

@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
 rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {

代码示例来源:origin: org.springframework.xd/spring-xd-spark-streaming

@Override
  public Void call(JavaRDD<String> rdd) {
    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
      @Override
      public void call(Iterator<String> items) throws Exception {
        FileWriter fw;
        BufferedWriter bw = null;
        try {
          fw = new FileWriter(file.getAbsoluteFile());
          bw = new BufferedWriter(fw);
          while (items.hasNext()) {
            bw.append(items.next() + System.lineSeparator());
          }
        }
        catch (IOException ioe) {
          throw new RuntimeException(ioe);
        }
        finally {
          if (bw != null) {
            bw.close();
          }
        }
      }
    });
    return null;
  }
});

代码示例来源:origin: cloudera-labs/envelope

private void applyMutations(JavaRDD<Row> planned, Config outputConfig) {
 planned.foreachPartition(new ApplyMutationsForPartitionFunction(outputConfig, accumulators));
}

代码示例来源:origin: org.qcri.rheem/rheem-spark

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
    ChannelInstance[] inputs,
    ChannelInstance[] outputs,
    SparkExecutor sparkExecutor,
    OptimizationContext.OperatorContext operatorContext) {
  RddChannel.Instance input = (RddChannel.Instance) inputs[0];
  final JavaRDD<Object> rdd = input.provideRdd();
  final JavaRDD<Object> cachedRdd = rdd.cache();
  cachedRdd.foreachPartition(iterator -> {
  });
  RddChannel.Instance output = (RddChannel.Instance) outputs[0];
  output.accept(cachedRdd, sparkExecutor);
  return ExecutionOperator.modelQuasiEagerExecution(inputs, outputs, operatorContext);
}

代码示例来源:origin: com.cloudera.oryx/oryx-ml

newData.foreachPartition(p -> {});
pastData.foreachPartition(p -> {});

代码示例来源:origin: io.zipkin.sparkstreaming/zipkin-sparkstreaming

static void streamSpansToStorage(
  JavaDStream<byte[]> stream,
  ReadSpans readSpans,
  AdjustAndConsumeSpansSharingTraceId adjustAndConsumeSpansSharingTraceId
) {
 JavaDStream<Span> spans = stream.flatMap(readSpans);
 // TODO: plug in some filter to drop spans regardless of trace ID
 // spans = spans.filter(spanFilter);
 JavaPairDStream<String, Iterable<Span>> tracesById = spans
   .mapToPair(s -> new Tuple2<>(Util.toLowerHex(s.traceIdHigh, s.traceId), s))
   .groupByKey();
 tracesById.foreachRDD(rdd -> {
  rdd.values().foreachPartition(adjustAndConsumeSpansSharingTraceId);
 });
}

代码示例来源:origin: cloudera-labs/envelope

@Override
public void applyBulkMutations(List<Tuple2<MutationType, Dataset<Row>>> planned) {
 for (Tuple2<MutationType, Dataset<Row>> mutation : planned) {
  MutationType mutationType = mutation._1();
  Dataset<Row> mutationDF = mutation._2();
  if (mutationType.equals(MutationType.INSERT)) {
   mutationDF.javaRDD().foreachPartition(new SendRowToKafkaFunction(config));
  }
 }
}

代码示例来源:origin: locationtech/geowave

if (isS3) {
 final String s3FinalEndpointUrl = s3EndpointUrl;
 fileRDD.foreachPartition(uri -> {
  S3FileSystem fs = initializeS3FS(s3FinalEndpointUrl);
  List<URI> inputFiles = new ArrayList<URI>();
 fileRDD.foreachPartition(uri -> {
  processInput(
    configFile,

代码示例来源:origin: uk.gov.gchq.gaffer/parquet-store

new WriteUnsortedDataFunction(store.getTempFilesDir(), store.getSchemaUtils(), groupToSplitPoints);
input
    .foreachPartition(writeUnsortedDataFunction);
LOGGER.debug("Finished writing the unsorted Parquet data to {}", tempDataDirString);

代码示例来源:origin: deepspark/deepspark

data.foreachPartition(new VoidFunction<Iterator<Record>>() {
  private static final long serialVersionUID = -4641037124928675165L;

代码示例来源:origin: deepspark/deepspark

printLogTime("Training start...");
dummydata.foreachPartition(new VoidFunction<Iterator<Integer>>() {
  private static final long serialVersionUID = -4641037124928675165L;

代码示例来源:origin: deepspark/deepspark

printLogTime("Training start...");
dummydata.foreachPartition(new VoidFunction<Iterator<Integer>>() {
  private static final long serialVersionUID = -4641037124928675165L;

代码示例来源:origin: deepspark/deepspark

printLogTime("Training start...");
data.foreachPartition(new VoidFunction<Iterator<Record>>() {
  private static final long serialVersionUID = -4641037124928675165L;

相关文章

微信公众号

最新文章

更多