本文整理了Java中org.apache.spark.api.java.JavaPairRDD.foreach()
方法的一些代码示例,展示了JavaPairRDD.foreach()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.foreach()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:foreach
暂无
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
FileOutputStream fos1 = new FileOutputStream(file1);
FileChannel channel1 = fos1.getChannel();
ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
for (Tuple2<String, PortableDataStream> res : result) {
assertArrayEquals(content1, res._2().toArray());
}
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
FileOutputStream fos1 = new FileOutputStream(file1);
FileChannel channel1 = fos1.getChannel();
ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
for (Tuple2<String, PortableDataStream> res : result) {
assertArrayEquals(content1, res._2().toArray());
}
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
FileOutputStream fos1 = new FileOutputStream(file1);
FileChannel channel1 = fos1.getChannel();
ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
for (Tuple2<String, PortableDataStream> res : result) {
assertArrayEquals(content1, res._2().toArray());
}
}
代码示例来源:origin: mvalleavila/Kafka-Spark-Hbase-Example
@Override
public Void call(JavaPairRDD<String, Integer> values,
Time time) throws Exception {
values.foreach(new VoidFunction<Tuple2<String, Integer>> () {
@Override
public void call(Tuple2<String, Integer> tuple)
throws Exception {
HBaseCounterIncrementor incrementor =
HBaseCounterIncrementor.getInstance(broadcastTableName.value(), broadcastColumnFamily.value());
incrementor.increment("Counter", tuple._1(), tuple._2());
System.out.println("------------------------------- Counter:" + tuple._1() + "," + tuple._2());
}} );
return null;
}});
代码示例来源:origin: DataSystemsLab/GeoSpark
deleteLocalFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i), imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: DataSystemsLab/GeoSpark
deleteHadoopFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + ".", imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: DataSystemsLab/GeoSpark
deleteHadoopFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + ".", imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: DataSystemsLab/GeoSpark
deleteLocalFile(outputPath + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i), imageType);
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: com.davidbracewell/mango
@Override
public void forEach(@NonNull SerializableBiConsumer<? super T, ? super U> consumer) {
rdd.foreach(tuple -> {
Configurator.INSTANCE.configure(configBroadcast.value());
consumer.accept(tuple._1(), tuple._2());
});
}
代码示例来源:origin: deepspark/deepspark
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("ImagenetSampler")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
int numExecutors = conf.getInt("spark.executor.instances", -1);
System.out.println("number of executors = " + numExecutors);
System.out.println("Data Loading...");
JavaPairRDD<FloatWritable, ArrayPrimitiveWritable> train_seq =
sc.sequenceFile("imagenet_sampled.hsf", FloatWritable.class, ArrayPrimitiveWritable.class);
train_seq.foreach(new VoidFunction<Tuple2<FloatWritable,ArrayPrimitiveWritable>>() {
@Override
public void call(Tuple2<FloatWritable, ArrayPrimitiveWritable> arg0) throws Exception {
System.out.println(arg0._1.get() + " " + ((float[]) arg0._2.get()).length);
}
});
sc.close();
}
代码示例来源:origin: melphi/spark-examples
/**
* The task body
*/
public void run(String inputFilePath) {
/*
* This is the address of the Spark cluster. We will call the task from WordCountTest and we
* use a local standalone cluster. [*] means use all the cores available.
* See {@see http://spark.apache.org/docs/latest/submitting-applications.html#master-urls}.
*/
String master = "local[*]";
/*
* Initialises a Spark context.
*/
SparkConf conf = new SparkConf()
.setAppName(WordCountTask.class.getName())
.setMaster(master);
JavaSparkContext context = new JavaSparkContext(conf);
/*
* Performs a work count sequence of tasks and prints the output with a logger.
*/
context.textFile(inputFilePath)
.flatMap(text -> Arrays.asList(text.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b)
.foreach(result -> LOGGER.info(
String.format("Word [%s] count [%d].", result._1(), result._2)));
}
}
代码示例来源:origin: Erik-ly/SprakProject
extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() {
代码示例来源:origin: DataSystemsLab/GeoSpark
s3Operator.deleteImage(bucketName, path + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + "." + imageType.getTypeName());
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: DataSystemsLab/GeoSpark
s3Operator.deleteImage(bucketName, path + "-" + RasterizationUtils.getImageTileName(zoomLevel, partitionOnX, partitionOnY, i) + "." + imageType.getTypeName());
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>()
代码示例来源:origin: aseigneurin/kafka-sandbox
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Set<String> topics = Collections.singleton("mytopic");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
代码示例来源:origin: spirom/learning-spark-with-java
pairsRDD.foreach(i -> System.out.println(i));
reducedRDD.foreach(i -> System.out.println(i));
foldedRDD.foreach(i -> System.out.println(i));
averageRDD.foreach(i -> System.out.println(i));
averageRDDtheHardWay.foreach(i -> System.out.println(i));
代码示例来源:origin: Erik-ly/SprakProject
sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
代码示例来源:origin: osmlab/atlas-checks
resultRDD.foreach(countryPathPair ->
代码示例来源:origin: spirom/learning-spark-with-java
rdd.foreach(e -> System.out.println(e));
});
代码示例来源:origin: scipr-lab/dizk
qapWitness.coefficientsH().filter(e -> e._1 >= qapWitness.degree() - 2).foreach(coeff -> {
if (coeff._1 == qapWitness.degree() - 2) {
assert (!coeff._2.equals(zero));
内容来源于网络,如有侵权,请联系作者删除!