本文整理了Java中org.apache.spark.api.java.JavaRDD.coalesce()
方法的一些代码示例,展示了JavaRDD.coalesce()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.coalesce()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:coalesce
暂无
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static JavaRDD<String> readBiosetFiles(JavaSparkContext ctx,
String biosetFiles)
throws Exception {
StringBuilder unionPath = new StringBuilder();
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(biosetFiles));
String singleBiosetFile = null;
while ((singleBiosetFile = in.readLine()) != null) {
singleBiosetFile = singleBiosetFile.trim();
unionPath.append(singleBiosetFile);
unionPath.append(",");
}
//System.out.println("readBiosetFiles() unionPath="+unionPath);
}
finally {
if (in != null) {
in.close();
}
}
// remove the last comma ","
String unionPathAsString = unionPath.toString();
unionPathAsString = unionPathAsString.substring(0, unionPathAsString.length()-1);
// create RDD
JavaRDD<String> allBiosets = ctx.textFile(unionPathAsString);
JavaRDD<String> partitioned = allBiosets.coalesce(14);
return partitioned;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static JavaRDD<String> readBiosetFiles(JavaSparkContext ctx,
String biosetFiles)
throws Exception {
StringBuilder unionPath = new StringBuilder();
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(biosetFiles));
String singleBiosetFile = null;
while ((singleBiosetFile = in.readLine()) != null) {
singleBiosetFile = singleBiosetFile.trim();
unionPath.append(singleBiosetFile);
unionPath.append(",");
}
//System.out.println("readBiosetFiles() unionPath="+unionPath);
}
finally {
if (in != null) {
in.close();
}
}
// remove the last comma ","
String unionPathAsString = unionPath.toString();
unionPathAsString = unionPathAsString.substring(0, unionPathAsString.length()-1);
// create RDD
JavaRDD<String> allBiosets = ctx.textFile(unionPathAsString);
JavaRDD<String> partitioned = allBiosets.coalesce(14);
return partitioned;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
private static JavaRDD<String> readInputFiles(JavaSparkContext ctx,
String filename)
throws Exception {
List<String> biosetFiles = toList(filename);
int counter = 0;
JavaRDD[] rdds = new JavaRDD[biosetFiles.size()];
for (String biosetFileName : biosetFiles) {
System.out.println("debug1 biosetFileName=" + biosetFileName);
JavaRDD<String> record = ctx.textFile(biosetFileName);
rdds[counter] = record;
counter++;
}
JavaRDD<String> allBiosets = ctx.union(rdds);
return allBiosets.coalesce(9, false);
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<String> rdd = lines.coalesce(9);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<String> rdd = lines.coalesce(9);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<String> rdd = lines.coalesce(9);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<String> rdd = lines.coalesce(9);
代码示例来源:origin: org.qcri.rheem/rheem-spark
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
assert outputs.length <= 1;
final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
input.provideRdd()
.coalesce(1) // TODO: Remove. This only hotfixes the issue that JavaObjectFileSource reads only a single file.
.saveAsObjectFile(targetPath);
LoggerFactory.getLogger(this.getClass()).info("Writing dataset to {}.", targetPath);
return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}
代码示例来源:origin: org.datavec/datavec-spark
public static void exportCSVSpark(String directory, String delimiter, String quote, int outputSplits,
JavaRDD<List<Writable>> data) {
//NOTE: Order is probably not random here...
JavaRDD<String> lines = data.map(new WritablesToStringFunction(delimiter, quote));
lines.coalesce(outputSplits);
lines.saveAsTextFile(directory);
}
代码示例来源:origin: org.datavec/datavec-spark_2.11
public static void exportCSVSpark(String directory, String delimiter, String quote, int outputSplits,
JavaRDD<List<Writable>> data) {
//NOTE: Order is probably not random here...
JavaRDD<String> lines = data.map(new WritablesToStringFunction(delimiter, quote));
lines.coalesce(outputSplits);
lines.saveAsTextFile(directory);
}
代码示例来源:origin: org.qcri.rheem/rheem-spark
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();
final RddChannel.Instance input = (RddChannel.Instance) inputs[0];
final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
final JavaRDD<Type> inputRdd = input.provideRdd();
final JavaRDD<Type> coalescedRdd = inputRdd.coalesce(1);
this.name(coalescedRdd);
final JavaRDD<Iterable<Type>> outputRdd = coalescedRdd
.mapPartitions(partitionIterator -> {
Collection<Type> dataUnitGroup = new ArrayList<>();
while (partitionIterator.hasNext()) {
dataUnitGroup.add(partitionIterator.next());
}
return Collections.singleton(dataUnitGroup);
});
this.name(outputRdd);
output.accept(outputRdd, sparkExecutor);
return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}
代码示例来源:origin: org.datavec/datavec-spark_2.11
/**
* Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record is given
* a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link RecordWritable} instances.
* <p>
* Use {@link #restoreSequenceFile(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the sequence file
* @param rdd RDD to save
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output sequence files
* @see #saveSequenceFileSequences(String, JavaRDD)
* @see #saveMapFile(String, JavaRDD)
*/
public static void saveSequenceFile(String path, JavaRDD<List<Writable>> rdd, @Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class,
SequenceFileOutputFormat.class);
}
代码示例来源:origin: org.datavec/datavec-spark
/**
* Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record is given
* a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link RecordWritable} instances.
* <p>
* Use {@link #restoreSequenceFile(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the sequence file
* @param rdd RDD to save
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output sequence files
* @see #saveSequenceFileSequences(String, JavaRDD)
* @see #saveMapFile(String, JavaRDD)
*/
public static void saveSequenceFile(String path, JavaRDD<List<Writable>> rdd, @Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class,
SequenceFileOutputFormat.class);
}
代码示例来源:origin: org.datavec/datavec-spark
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record
* is given a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link SequenceRecordWritable} instances.
* <p>
* Use {@link #restoreSequenceFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the sequence file
* @param rdd RDD to save
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output sequence files
* @see #saveSequenceFile(String, JavaRDD)
* @see #saveMapFileSequences(String, JavaRDD)
*/
public static void saveSequenceFileSequences(String path, JavaRDD<List<List<Writable>>> rdd,
@Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
SequenceFileOutputFormat.class);
}
代码示例来源:origin: org.datavec/datavec-spark_2.11
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record
* is given a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link SequenceRecordWritable} instances.
* <p>
* Use {@link #restoreSequenceFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the sequence file
* @param rdd RDD to save
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output sequence files
* @see #saveSequenceFile(String, JavaRDD)
* @see #saveMapFileSequences(String, JavaRDD)
*/
public static void saveSequenceFileSequences(String path, JavaRDD<List<List<Writable>>> rdd,
@Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
SequenceFileOutputFormat.class);
}
代码示例来源:origin: org.datavec/datavec-spark_2.11
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link SequenceRecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
@Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
MapFileOutputFormat.class, c);
}
代码示例来源:origin: org.datavec/datavec-spark
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link SequenceRecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
@Nullable Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
MapFileOutputFormat.class, c);
}
代码示例来源:origin: org.qcri.rheem/rheem-spark
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
final RddChannel.Instance input = (RddChannel.Instance) inputs[0];
final JavaRDD<Object> rdd = input.provideRdd();
final JavaRDD<String> serializedRdd = rdd
.map(dataQuantum -> {
// TODO: Once there are more tuple types, make this generic.
@SuppressWarnings("unchecked")
Tuple2<Object, Object> tuple2 = (Tuple2<Object, Object>) dataQuantum;
return String.valueOf(tuple2.field0) + '\t' + String.valueOf(tuple2.field1);
});
this.name(serializedRdd);
serializedRdd
.coalesce(1) // TODO: Allow more than one TSV file?
.saveAsTextFile(targetPath);
return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}
代码示例来源:origin: KeithSSmith/spark-compaction
public void compact(String inputPath, String outputPath) throws IOException {
this.setCompressionAndSerializationOptions(inputPath, outputPath);
this.outputCompressionProperties(this.outputCompression);
// Defining Spark Context with a generic Spark Configuration.
SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (this.outputSerialization.equals(TEXT)) {
JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
} else if (this.outputSerialization.equals(PARQUET)) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
} else if (this.outputSerialization.equals(AVRO)) {
// For this to work the files must end in .avro
// Another issue is that when using compression the compression codec extension is not being added to the file name.
SQLContext sqlContext = new SQLContext(sc);
DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
} else {
System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +
this.outputSerialization);
}
}
代码示例来源:origin: KeithSSmith/spark-compaction
public void compact(String[] args) throws IOException {
this.setCompressionAndSerializationOptions(this.parseCli(args));
this.outputCompressionProperties(this.outputCompression);
// Defining Spark Context with a generic Spark Configuration.
SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (this.outputSerialization.equals(TEXT)) {
JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
} else if (this.outputSerialization.equals(PARQUET)) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
} else if (this.outputSerialization.equals(AVRO)) {
// For this to work the files must end in .avro
SQLContext sqlContext = new SQLContext(sc);
DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
} else {
System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +
this.outputSerialization);
}
}
内容来源于网络,如有侵权,请联系作者删除!