org.apache.spark.api.java.JavaPairRDD.foreach()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(81)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.foreach()方法的一些代码示例,展示了JavaPairRDD.foreach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.foreach()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:foreach

JavaPairRDD.foreach介绍

暂无

代码示例

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void binaryFilesCaching() throws Exception {
 // Reusing the wholeText files example
 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
 String tempDirName = tempDir.getAbsolutePath();
 File file1 = new File(tempDirName + "/part-00000");
 FileOutputStream fos1 = new FileOutputStream(file1);
 FileChannel channel1 = fos1.getChannel();
 ByteBuffer bbuf = ByteBuffer.wrap(content1);
 channel1.write(bbuf);
 channel1.close();
 JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
 readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
 List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
 for (Tuple2<String, PortableDataStream> res : result) {
  assertArrayEquals(content1, res._2().toArray());
 }
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void binaryFilesCaching() throws Exception {
 // Reusing the wholeText files example
 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
 String tempDirName = tempDir.getAbsolutePath();
 File file1 = new File(tempDirName + "/part-00000");
 FileOutputStream fos1 = new FileOutputStream(file1);
 FileChannel channel1 = fos1.getChannel();
 ByteBuffer bbuf = ByteBuffer.wrap(content1);
 channel1.write(bbuf);
 channel1.close();
 JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
 readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
 List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
 for (Tuple2<String, PortableDataStream> res : result) {
  assertArrayEquals(content1, res._2().toArray());
 }
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void binaryFilesCaching() throws Exception {
 // Reusing the wholeText files example
 byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
 String tempDirName = tempDir.getAbsolutePath();
 File file1 = new File(tempDirName + "/part-00000");
 FileOutputStream fos1 = new FileOutputStream(file1);
 FileChannel channel1 = fos1.getChannel();
 ByteBuffer bbuf = ByteBuffer.wrap(content1);
 channel1.write(bbuf);
 channel1.close();
 JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
 readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
 List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
 for (Tuple2<String, PortableDataStream> res : result) {
  assertArrayEquals(content1, res._2().toArray());
 }
}

代码示例来源:origin: mvalleavila/Kafka-Spark-Hbase-Example

@Override
public Void call(JavaPairRDD<String, Integer> values,
    Time time) throws Exception {
  
  values.foreach(new VoidFunction<Tuple2<String, Integer>> () {
    @Override
    public void call(Tuple2<String, Integer> tuple)
        throws Exception {
      HBaseCounterIncrementor incrementor = 
          HBaseCounterIncrementor.getInstance(broadcastTableName.value(), broadcastColumnFamily.value());
      incrementor.increment("Counter", tuple._1(), tuple._2());
      System.out.println("------------------------------- Counter:" + tuple._1() + "," + tuple._2());
      
    }} );
  
  return null;
}});

代码示例来源:origin: DataSystemsLab/GeoSpark

deleteLocalFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i), imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: DataSystemsLab/GeoSpark

deleteHadoopFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + ".", imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: DataSystemsLab/GeoSpark

deleteHadoopFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + ".", imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: DataSystemsLab/GeoSpark

deleteLocalFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i), imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: com.davidbracewell/mango

@Override
public void forEach(@NonNull SerializableBiConsumer<? super T, ? super U> consumer) {
 rdd.foreach(tuple -> {
   Configurator.INSTANCE.configure(configBroadcast.value());
   consumer.accept(tuple._1(), tuple._2());
 });
}

代码示例来源:origin: deepspark/deepspark

public static void main(String[] args) {
  // TODO Auto-generated method stub
  SparkConf conf = new SparkConf().setAppName("ImagenetSampler")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  int numExecutors = conf.getInt("spark.executor.instances", -1);
  System.out.println("number of executors = " + numExecutors);
  System.out.println("Data Loading...");
  JavaPairRDD<FloatWritable, ArrayPrimitiveWritable> train_seq = 
      sc.sequenceFile("imagenet_sampled.hsf", FloatWritable.class, ArrayPrimitiveWritable.class);
  
  train_seq.foreach(new VoidFunction<Tuple2<FloatWritable,ArrayPrimitiveWritable>>() {
    
    @Override
    public void call(Tuple2<FloatWritable, ArrayPrimitiveWritable> arg0) throws Exception {
      System.out.println(arg0._1.get() + " " + ((float[]) arg0._2.get()).length);
    }
  });
  sc.close();
}

代码示例来源:origin: melphi/spark-examples

/**
  * The task body
  */
 public void run(String inputFilePath) {
  /*
   * This is the address of the Spark cluster. We will call the task from WordCountTest and we
   * use a local standalone cluster. [*] means use all the cores available.
   * See {@see http://spark.apache.org/docs/latest/submitting-applications.html#master-urls}.
   */
  String master = "local[*]";

  /*
   * Initialises a Spark context.
   */
  SparkConf conf = new SparkConf()
    .setAppName(WordCountTask.class.getName())
    .setMaster(master);
  JavaSparkContext context = new JavaSparkContext(conf);

  /*
   * Performs a work count sequence of tasks and prints the output with a logger.
   */
  context.textFile(inputFilePath)
    .flatMap(text -> Arrays.asList(text.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b)
    .foreach(result -> LOGGER.info(
      String.format("Word [%s] count [%d].", result._1(), result._2)));
 }
}

代码示例来源:origin: Erik-ly/SprakProject

extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() {

代码示例来源:origin: DataSystemsLab/GeoSpark

s3Operator.deleteImage(bucketName, path + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + "." + imageType.getTypeName());
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: DataSystemsLab/GeoSpark

s3Operator.deleteImage(bucketName, path + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + "." + imageType.getTypeName());
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()

代码示例来源:origin: aseigneurin/kafka-sandbox

public static void main(String[] args) {
  SparkConf conf = new SparkConf()
      .setAppName("kafka-sandbox")
      .setMaster("local[*]");
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
  Set<String> topics = Collections.singleton("mytopic");
  Map<String, String> kafkaParams = new HashMap<>();
  kafkaParams.put("metadata.broker.list", "localhost:9092");
  JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
      String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
  directKafkaStream.foreachRDD(rdd -> {
    System.out.println("--- New RDD with " + rdd.partitions().size()
        + " partitions and " + rdd.count() + " records");
    rdd.foreach(record -> System.out.println(record._2));
  });
  ssc.start();
  ssc.awaitTermination();
}

代码示例来源:origin: spirom/learning-spark-with-java

pairsRDD.foreach(i -> System.out.println(i));
reducedRDD.foreach(i -> System.out.println(i));
foldedRDD.foreach(i -> System.out.println(i));
averageRDD.foreach(i -> System.out.println(i));
averageRDDtheHardWay.foreach(i -> System.out.println(i));

代码示例来源:origin: Erik-ly/SprakProject

sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {

代码示例来源:origin: osmlab/atlas-checks

resultRDD.foreach(countryPathPair ->

代码示例来源:origin: spirom/learning-spark-with-java

rdd.foreach(e -> System.out.println(e));
});

代码示例来源:origin: scipr-lab/dizk

qapWitness.coefficientsH().filter(e -> e._1 >= qapWitness.degree() - 2).foreach(coeff -> {
  if (coeff._1 == qapWitness.degree() - 2) {
    assert (!coeff._2.equals(zero));

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法