org.apache.spark.api.java.JavaRDD.foreach()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中org.apache.spark.api.java.JavaRDD.foreach()方法的一些代码示例,展示了JavaRDD.foreach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.foreach()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:foreach

JavaRDD.foreach介绍

暂无

代码示例

代码示例来源:origin: deeplearning4j/dl4j-examples

rdd.foreach(new DownloadToAzureFn(baseDirZips, true));
long end = System.currentTimeMillis();
log.info("*** Completed download of patent data in {} sec ***", (end-start)/1000);

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void foreach() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(x -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void foreachWithAnonymousClass() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void foreach() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(x -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void foreachWithAnonymousClass() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void foreachWithAnonymousClass() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void foreach() {
 foreachCalls = 0;
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(x -> foreachCalls++);
 Assert.assertEquals(2, foreachCalls);
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void foreach() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> accum.add(1));
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void foreach() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> accum.add(1));
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void foreach() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreach(s -> accum.add(1));
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: databricks/learning-spark

rdd.foreach(new VoidFunction<String>(){ public void call(String line) {
   if (line.contains("KK6JKQ")) {
    count.add(1);

代码示例来源:origin: org.apache.spark/spark-core

rdd.foreach(intAccum::add);
assertEquals((Integer) 25, intAccum.value());
rdd.foreach(x -> doubleAccum.add((double) x));
assertEquals((Double) 25.0, doubleAccum.value());
rdd.foreach(x -> floatAccum.add((float) x));
assertEquals((Float) 25.0f, floatAccum.value());

代码示例来源:origin: org.apache.spark/spark-core_2.10

rdd.foreach(intAccum::add);
assertEquals((Integer) 25, intAccum.value());
rdd.foreach(x -> doubleAccum.add((double) x));
assertEquals((Double) 25.0, doubleAccum.value());
rdd.foreach(x -> floatAccum.add((float) x));
assertEquals((Float) 25.0f, floatAccum.value());

代码示例来源:origin: org.apache.spark/spark-core_2.11

rdd.foreach(intAccum::add);
assertEquals((Integer) 25, intAccum.value());
rdd.foreach(x -> doubleAccum.add((double) x));
assertEquals((Double) 25.0, doubleAccum.value());
rdd.foreach(x -> floatAccum.add((float) x));
assertEquals((Float) 25.0f, floatAccum.value());

代码示例来源:origin: streampipes/streampipes-ce

@Override
public void call(JavaRDD<Map<String, Object>> javaRDD) throws Exception {
  //System.out.println("Sending Kafka output");
  javaRDD.foreach(new VoidFunction<Map<String, Object>>() {
    private static final long serialVersionUID = 1L;
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void call(Map<String, Object> map) throws Exception {
      KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaParams);
      producer.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(map)));
    }
  });
}

代码示例来源:origin: org.apache.beam/beam-runners-spark

@Override
public void action() {
 // Empty function to force computation of RDD.
 rdd.foreach(TranslationUtils.emptyVoidFunction());
}

代码示例来源:origin: org.apache.beam/beam-runners-spark

@Override
public void action() {
 // Force computation of DStream.
 dStream.foreachRDD(rdd -> rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction()));
}

代码示例来源:origin: com.davidbracewell/mango

@Override
public void forEach(@NonNull SerializableConsumer<? super T> consumer) {
 rdd.foreach(t -> {
   Configurator.INSTANCE.configure(configBroadcast.value());
   consumer.accept(t);
 });
}

代码示例来源:origin: uber/marmaray

private void logWriteMetrics(final Optional<JavaRDD<WriteStatus>> writesStatuses) {
  if (writesStatuses.isPresent() && this.dataFeedMetrics.isPresent()) {
    final LongAccumulator totalCount = writesStatuses.get().rdd().sparkContext().longAccumulator();
    final LongAccumulator errorCount = writesStatuses.get().rdd().sparkContext().longAccumulator();
    writesStatuses.get().foreach(writeStatus -> {
        errorCount.add(writeStatus.getFailedRecords().size());
        totalCount.add(writeStatus.getTotalRecords());
      });
    this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.ERROR_ROWCOUNT, errorCount.value(),
        this.dataFeedMetricsTags);
    this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.OUTPUT_ROWCOUNT,
        totalCount.value() - errorCount.value(), this.dataFeedMetricsTags);
  }
}

代码示例来源:origin: cloudera-labs/envelope

@Override
public void applyBulkMutations(List<Tuple2<MutationType, Dataset<Row>>> planned) {
 for (Tuple2<MutationType, Dataset<Row>> mutation : planned) {
  MutationType mutationType = mutation._1();
  Dataset<Row> mutationDF = mutation._2();
  if (mutationType.equals(MutationType.INSERT)) {
   mutationDF.javaRDD().foreach(new SendRowToLogFunction(delimiter, logLevel));
  }
 }
}

相关文章

微信公众号

最新文章

更多