本文整理了Java中org.apache.spark.api.java.JavaRDD.foreachPartition()
方法的一些代码示例,展示了JavaRDD.foreachPartition()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.foreachPartition()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:foreachPartition
暂无
代码示例来源:origin: OryxProject/oryx
newData.foreachPartition(p -> {});
pastData.foreachPartition(p -> {});
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void foreachPartition() {
LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(iter -> {
while (iter.hasNext()) {
iter.next();
accum.add(1);
}
});
assertEquals(2, accum.value().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void foreachPartition() {
LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(iter -> {
while (iter.hasNext()) {
iter.next();
accum.add(1);
}
});
assertEquals(2, accum.value().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void foreachPartition() {
LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(iter -> {
while (iter.hasNext()) {
iter.next();
accum.add(1);
}
});
assertEquals(2, accum.value().intValue());
}
代码示例来源:origin: dibbhatt/kafka-spark-consumer
@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {
代码示例来源:origin: org.springframework.xd/spring-xd-spark-streaming
@Override
public Void call(JavaRDD<String> rdd) {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> items) throws Exception {
FileWriter fw;
BufferedWriter bw = null;
try {
fw = new FileWriter(file.getAbsoluteFile());
bw = new BufferedWriter(fw);
while (items.hasNext()) {
bw.append(items.next() + System.lineSeparator());
}
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
finally {
if (bw != null) {
bw.close();
}
}
}
});
return null;
}
});
代码示例来源:origin: cloudera-labs/envelope
private void applyMutations(JavaRDD<Row> planned, Config outputConfig) {
planned.foreachPartition(new ApplyMutationsForPartitionFunction(outputConfig, accumulators));
}
代码示例来源:origin: org.qcri.rheem/rheem-spark
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
final JavaRDD<Object> rdd = input.provideRdd();
final JavaRDD<Object> cachedRdd = rdd.cache();
cachedRdd.foreachPartition(iterator -> {
});
RddChannel.Instance output = (RddChannel.Instance) outputs[0];
output.accept(cachedRdd, sparkExecutor);
return ExecutionOperator.modelQuasiEagerExecution(inputs, outputs, operatorContext);
}
代码示例来源:origin: com.cloudera.oryx/oryx-ml
newData.foreachPartition(p -> {});
pastData.foreachPartition(p -> {});
代码示例来源:origin: io.zipkin.sparkstreaming/zipkin-sparkstreaming
static void streamSpansToStorage(
JavaDStream<byte[]> stream,
ReadSpans readSpans,
AdjustAndConsumeSpansSharingTraceId adjustAndConsumeSpansSharingTraceId
) {
JavaDStream<Span> spans = stream.flatMap(readSpans);
// TODO: plug in some filter to drop spans regardless of trace ID
// spans = spans.filter(spanFilter);
JavaPairDStream<String, Iterable<Span>> tracesById = spans
.mapToPair(s -> new Tuple2<>(Util.toLowerHex(s.traceIdHigh, s.traceId), s))
.groupByKey();
tracesById.foreachRDD(rdd -> {
rdd.values().foreachPartition(adjustAndConsumeSpansSharingTraceId);
});
}
代码示例来源:origin: cloudera-labs/envelope
@Override
public void applyBulkMutations(List<Tuple2<MutationType, Dataset<Row>>> planned) {
for (Tuple2<MutationType, Dataset<Row>> mutation : planned) {
MutationType mutationType = mutation._1();
Dataset<Row> mutationDF = mutation._2();
if (mutationType.equals(MutationType.INSERT)) {
mutationDF.javaRDD().foreachPartition(new SendRowToKafkaFunction(config));
}
}
}
代码示例来源:origin: locationtech/geowave
if (isS3) {
final String s3FinalEndpointUrl = s3EndpointUrl;
fileRDD.foreachPartition(uri -> {
S3FileSystem fs = initializeS3FS(s3FinalEndpointUrl);
List<URI> inputFiles = new ArrayList<URI>();
fileRDD.foreachPartition(uri -> {
processInput(
configFile,
代码示例来源:origin: uk.gov.gchq.gaffer/parquet-store
new WriteUnsortedDataFunction(store.getTempFilesDir(), store.getSchemaUtils(), groupToSplitPoints);
input
.foreachPartition(writeUnsortedDataFunction);
LOGGER.debug("Finished writing the unsorted Parquet data to {}", tempDataDirString);
代码示例来源:origin: deepspark/deepspark
data.foreachPartition(new VoidFunction<Iterator<Record>>() {
private static final long serialVersionUID = -4641037124928675165L;
代码示例来源:origin: deepspark/deepspark
printLogTime("Training start...");
dummydata.foreachPartition(new VoidFunction<Iterator<Integer>>() {
private static final long serialVersionUID = -4641037124928675165L;
代码示例来源:origin: deepspark/deepspark
printLogTime("Training start...");
dummydata.foreachPartition(new VoidFunction<Iterator<Integer>>() {
private static final long serialVersionUID = -4641037124928675165L;
代码示例来源:origin: deepspark/deepspark
printLogTime("Training start...");
data.foreachPartition(new VoidFunction<Iterator<Record>>() {
private static final long serialVersionUID = -4641037124928675165L;
内容来源于网络,如有侵权,请联系作者删除!