org.apache.spark.api.java.JavaRDD.coalesce()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(17.3k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中org.apache.spark.api.java.JavaRDD.coalesce()方法的一些代码示例,展示了JavaRDD.coalesce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.coalesce()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:coalesce

JavaRDD.coalesce介绍

暂无

代码示例

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static JavaRDD<String> readBiosetFiles(JavaSparkContext ctx,
                   String biosetFiles)
 throws Exception {
 StringBuilder  unionPath = new StringBuilder();
 BufferedReader in = null;
 try {
   in = new BufferedReader(new FileReader(biosetFiles));
   String singleBiosetFile = null;
   while ((singleBiosetFile = in.readLine()) != null) {
    singleBiosetFile = singleBiosetFile.trim();
    unionPath.append(singleBiosetFile);
    unionPath.append(",");
   }
   //System.out.println("readBiosetFiles() unionPath="+unionPath);
 }
 finally {
   if (in != null) {
    in.close();
   }
 }
 // remove the last comma ","
 String unionPathAsString = unionPath.toString();
 unionPathAsString = unionPathAsString.substring(0, unionPathAsString.length()-1);
 // create RDD   
 JavaRDD<String> allBiosets = ctx.textFile(unionPathAsString);
 JavaRDD<String> partitioned = allBiosets.coalesce(14);
 return partitioned;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static JavaRDD<String> readBiosetFiles(JavaSparkContext ctx,
                   String biosetFiles)
 throws Exception {
 StringBuilder  unionPath = new StringBuilder();
 BufferedReader in = null;
 try {
   in = new BufferedReader(new FileReader(biosetFiles));
   String singleBiosetFile = null;
   while ((singleBiosetFile = in.readLine()) != null) {
    singleBiosetFile = singleBiosetFile.trim();
    unionPath.append(singleBiosetFile);
    unionPath.append(",");
   }
   //System.out.println("readBiosetFiles() unionPath="+unionPath);
 }
 finally {
   if (in != null) {
    in.close();
   }
 }
 // remove the last comma ","
 String unionPathAsString = unionPath.toString();
 unionPathAsString = unionPathAsString.substring(0, unionPathAsString.length()-1);
 // create RDD   
 JavaRDD<String> allBiosets = ctx.textFile(unionPathAsString);
 JavaRDD<String> partitioned = allBiosets.coalesce(14);
 return partitioned;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

private static JavaRDD<String> readInputFiles(JavaSparkContext ctx,
                       String filename)
 throws Exception {
   List<String> biosetFiles = toList(filename);
   int counter = 0;
   JavaRDD[] rdds = new JavaRDD[biosetFiles.size()];
   for (String biosetFileName : biosetFiles) {
    System.out.println("debug1 biosetFileName=" + biosetFileName);
    JavaRDD<String> record = ctx.textFile(biosetFileName);
    rdds[counter] = record;
    counter++;
   }
   JavaRDD<String> allBiosets = ctx.union(rdds);
   return allBiosets.coalesce(9, false);
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaRDD<String> rdd = lines.coalesce(9);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaRDD<String> rdd = lines.coalesce(9);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaRDD<String> rdd = lines.coalesce(9);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaRDD<String> rdd = lines.coalesce(9);

代码示例来源:origin: org.qcri.rheem/rheem-spark

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
    ChannelInstance[] inputs,
    ChannelInstance[] outputs,
    SparkExecutor sparkExecutor,
    OptimizationContext.OperatorContext operatorContext) {
  assert inputs.length == this.getNumInputs();
  assert outputs.length <= 1;
  final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
  final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
  RddChannel.Instance input = (RddChannel.Instance) inputs[0];
  input.provideRdd()
      .coalesce(1) // TODO: Remove. This only hotfixes the issue that JavaObjectFileSource reads only a single file.
      .saveAsObjectFile(targetPath);
  LoggerFactory.getLogger(this.getClass()).info("Writing dataset to {}.", targetPath);
  return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}

代码示例来源:origin: org.datavec/datavec-spark

public static void exportCSVSpark(String directory, String delimiter, String quote, int outputSplits,
        JavaRDD<List<Writable>> data) {
  //NOTE: Order is probably not random here...
  JavaRDD<String> lines = data.map(new WritablesToStringFunction(delimiter, quote));
  lines.coalesce(outputSplits);
  lines.saveAsTextFile(directory);
}

代码示例来源:origin: org.datavec/datavec-spark_2.11

public static void exportCSVSpark(String directory, String delimiter, String quote, int outputSplits,
        JavaRDD<List<Writable>> data) {
  //NOTE: Order is probably not random here...
  JavaRDD<String> lines = data.map(new WritablesToStringFunction(delimiter, quote));
  lines.coalesce(outputSplits);
  lines.saveAsTextFile(directory);
}

代码示例来源:origin: org.qcri.rheem/rheem-spark

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
    ChannelInstance[] inputs,
    ChannelInstance[] outputs,
    SparkExecutor sparkExecutor,
    OptimizationContext.OperatorContext operatorContext) {
  assert inputs.length == this.getNumInputs();
  assert outputs.length == this.getNumOutputs();
  final RddChannel.Instance input = (RddChannel.Instance) inputs[0];
  final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
  final JavaRDD<Type> inputRdd = input.provideRdd();
  final JavaRDD<Type> coalescedRdd = inputRdd.coalesce(1);
  this.name(coalescedRdd);
  final JavaRDD<Iterable<Type>> outputRdd = coalescedRdd
      .mapPartitions(partitionIterator -> {
        Collection<Type> dataUnitGroup = new ArrayList<>();
        while (partitionIterator.hasNext()) {
          dataUnitGroup.add(partitionIterator.next());
        }
        return Collections.singleton(dataUnitGroup);
      });
  this.name(outputRdd);
  output.accept(outputRdd, sparkExecutor);
  return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}

代码示例来源:origin: org.datavec/datavec-spark_2.11

/**
 * Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record is given
 * a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link RecordWritable} instances.
 * <p>
 * Use {@link #restoreSequenceFile(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path           Path to save the sequence file
 * @param rdd            RDD to save
 * @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
 *                       to limit the maximum number of output sequence files
 * @see #saveSequenceFileSequences(String, JavaRDD)
 * @see #saveMapFile(String, JavaRDD)
 */
public static void saveSequenceFile(String path, JavaRDD<List<Writable>> rdd, @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
  JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class,
          SequenceFileOutputFormat.class);
}

代码示例来源:origin: org.datavec/datavec-spark

/**
 * Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record is given
 * a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link RecordWritable} instances.
 * <p>
 * Use {@link #restoreSequenceFile(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path           Path to save the sequence file
 * @param rdd            RDD to save
 * @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
 *                       to limit the maximum number of output sequence files
 * @see #saveSequenceFileSequences(String, JavaRDD)
 * @see #saveMapFile(String, JavaRDD)
 */
public static void saveSequenceFile(String path, JavaRDD<List<Writable>> rdd, @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
  JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class,
          SequenceFileOutputFormat.class);
}

代码示例来源:origin: org.datavec/datavec-spark

/**
 * Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record
 * is given a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link SequenceRecordWritable} instances.
 * <p>
 * Use {@link #restoreSequenceFileSequences(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path           Path to save the sequence file
 * @param rdd            RDD to save
 * @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
 *                       to limit the maximum number of output sequence files
 * @see #saveSequenceFile(String, JavaRDD)
 * @see #saveMapFileSequences(String, JavaRDD)
 */
public static void saveSequenceFileSequences(String path, JavaRDD<List<List<Writable>>> rdd,
        @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
  JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
          SequenceFileOutputFormat.class);
}

代码示例来源:origin: org.datavec/datavec-spark_2.11

/**
 * Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.SequenceFile}. Each record
 * is given a unique (but noncontiguous) {@link LongWritable} key, and values are stored as {@link SequenceRecordWritable} instances.
 * <p>
 * Use {@link #restoreSequenceFileSequences(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path           Path to save the sequence file
 * @param rdd            RDD to save
 * @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
 *                       to limit the maximum number of output sequence files
 * @see #saveSequenceFile(String, JavaRDD)
 * @see #saveMapFileSequences(String, JavaRDD)
 */
public static void saveSequenceFileSequences(String path, JavaRDD<List<List<Writable>>> rdd,
        @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithUniqueId(); //Note: Long values are unique + NOT contiguous; more efficient than zipWithIndex
  JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
          SequenceFileOutputFormat.class);
}

代码示例来源:origin: org.datavec/datavec-spark_2.11

/**
 * Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
 * given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
 * {@link SequenceRecordWritable} instances.<br>
 * <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
 * point of view. Contiguous keys are often only required for non-Spark use cases, such as with
 * {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
 * <p>
 * Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path Path to save the MapFile
 * @param rdd  RDD to save
 * @param c    Configuration object, used to customise options for the map file
 * @see #saveMapFileSequences(String, JavaRDD)
 * @see #saveSequenceFile(String, JavaRDD)
 */
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
        @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
  JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
          MapFileOutputFormat.class, c);
}

代码示例来源:origin: org.datavec/datavec-spark

/**
 * Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
 * given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
 * {@link SequenceRecordWritable} instances.<br>
 * <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
 * point of view. Contiguous keys are often only required for non-Spark use cases, such as with
 * {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
 * <p>
 * Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
 *
 * @param path Path to save the MapFile
 * @param rdd  RDD to save
 * @param c    Configuration object, used to customise options for the map file
 * @see #saveMapFileSequences(String, JavaRDD)
 * @see #saveSequenceFile(String, JavaRDD)
 */
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
        @Nullable Integer maxOutputFiles) {
  path = FilenameUtils.normalize(path, true);
  if (maxOutputFiles != null) {
    rdd = rdd.coalesce(maxOutputFiles);
  }
  JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
  JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
          dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
  keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
          MapFileOutputFormat.class, c);
}

代码示例来源:origin: org.qcri.rheem/rheem-spark

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
    ChannelInstance[] inputs,
    ChannelInstance[] outputs,
    SparkExecutor sparkExecutor,
    OptimizationContext.OperatorContext operatorContext) {
  assert inputs.length == this.getNumInputs();
  final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
  final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
  final RddChannel.Instance input = (RddChannel.Instance) inputs[0];
  final JavaRDD<Object> rdd = input.provideRdd();
  final JavaRDD<String> serializedRdd = rdd
      .map(dataQuantum -> {
        // TODO: Once there are more tuple types, make this generic.
        @SuppressWarnings("unchecked")
        Tuple2<Object, Object> tuple2 = (Tuple2<Object, Object>) dataQuantum;
        return String.valueOf(tuple2.field0) + '\t' + String.valueOf(tuple2.field1);
      });
  this.name(serializedRdd);
  serializedRdd
      .coalesce(1) // TODO: Allow more than one TSV file?
      .saveAsTextFile(targetPath);
  return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}

代码示例来源:origin: KeithSSmith/spark-compaction

public void compact(String inputPath, String outputPath) throws IOException {
  this.setCompressionAndSerializationOptions(inputPath, outputPath);
  this.outputCompressionProperties(this.outputCompression);
  
  // Defining Spark Context with a generic Spark Configuration.
  SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
  
  if (this.outputSerialization.equals(TEXT)) {
    JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
    textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
  } else if (this.outputSerialization.equals(PARQUET)) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
    parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
  } else if (this.outputSerialization.equals(AVRO)) {
    // For this to work the files must end in .avro
    // Another issue is that when using compression the compression codec extension is not being added to the file name.
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
    avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
  } else {
    System.out.println("Did not match any serialization type: text, parquet, or avro.  Recieved: " +
        this.outputSerialization);
  }
}

代码示例来源:origin: KeithSSmith/spark-compaction

public void compact(String[] args) throws IOException {
  this.setCompressionAndSerializationOptions(this.parseCli(args));
  this.outputCompressionProperties(this.outputCompression);
  
  // Defining Spark Context with a generic Spark Configuration.
  SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
      
  if (this.outputSerialization.equals(TEXT)) {
    JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
    textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
  } else if (this.outputSerialization.equals(PARQUET)) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
    parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
  } else if (this.outputSerialization.equals(AVRO)) {
    // For this to work the files must end in .avro
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
    avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
  } else {
    System.out.println("Did not match any serialization type: text, parquet, or avro.  Recieved: " +
        this.outputSerialization);
  }
}

相关文章

微信公众号

最新文章

更多