org.apache.spark.api.java.JavaSparkContext.newAPIHadoopFile()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(13.7k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中org.apache.spark.api.java.JavaSparkContext.newAPIHadoopFile()方法的一些代码示例,展示了JavaSparkContext.newAPIHadoopFile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaSparkContext.newAPIHadoopFile()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaSparkContext
类名称:JavaSparkContext
方法名:newAPIHadoopFile

JavaSparkContext.newAPIHadoopFile介绍

暂无

代码示例

代码示例来源:origin: org.apache.spark/spark-core_2.11

@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
 String outputDir = new File(tempDir, "output").getAbsolutePath();
 List<Tuple2<Integer, String>> pairs = Arrays.asList(
  new Tuple2<>(1, "a"),
  new Tuple2<>(2, "aa"),
  new Tuple2<>(3, "aaa")
 );
 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
  .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
  org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
  IntWritable.class, Text.class, Job.getInstance().getConfiguration());
 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
 String outputDir = new File(tempDir, "output").getAbsolutePath();
 List<Tuple2<Integer, String>> pairs = Arrays.asList(
  new Tuple2<>(1, "a"),
  new Tuple2<>(2, "aa"),
  new Tuple2<>(3, "aaa")
 );
 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
  .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
  org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
  IntWritable.class, Text.class, Job.getInstance().getConfiguration());
 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

代码示例来源:origin: org.apache.spark/spark-core

@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
 String outputDir = new File(tempDir, "output").getAbsolutePath();
 List<Tuple2<Integer, String>> pairs = Arrays.asList(
  new Tuple2<>(1, "a"),
  new Tuple2<>(2, "aa"),
  new Tuple2<>(3, "aaa")
 );
 JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
  .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
  org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
  IntWritable.class, Text.class, Job.getInstance().getConfiguration());
 assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<LongWritable ,Text> fastqRDD = ctx.newAPIHadoopFile(
       inputPath,
       FastqInputFormat.class,

代码示例来源:origin: stackoverflow.com

SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);

// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
    "file_path", // input path
    TextInputFormat.class, // used input format class
    LongWritable.class, // class of the value
    Text.class, // class of the value
    new Configuration());    

JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
  @Override
  public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
    // you will get each line from as a tuple (offset, text)    
    long pos = tuple._1().get(); // extract offset
    String line = tuple._2().toString(); // extract text

    return pos + " " + line;
  }
});

代码示例来源:origin: stackoverflow.com

public class Utils {

 public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
  JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
  return records.keys()
    .map(x -> (GenericRecord) x.datum())
    .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
 }
}

代码示例来源:origin: stackoverflow.com

public class Utils {

 public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
  JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
  return records.keys()
    .map(x -> (GenericRecord) x.datum())
    .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
 }
}

代码示例来源:origin: stackoverflow.com

JavaSparkContext sc = new
JavaSparkContext(conf);         
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class,
Text.class,
        FileInfoWritable.class, new Configuration());

代码示例来源:origin: uber/hudi

@Override
 public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
   Optional<String> lastCheckpointStr, long sourceLimit) {
  try {
   // find the source commit to pull
   Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);

   if (!commitToPull.isPresent()) {
    return new ImmutablePair<>(Optional.empty(),
        lastCheckpointStr.orElse(""));
   }

   // read the files out.
   List<FileStatus> commitDeltaFiles = Arrays.asList(
     fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
   String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
     .collect(Collectors.joining(","));
   JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
     AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
     sparkContext.hadoopConfiguration());
   return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
     String.valueOf(commitToPull.get()));
  } catch (IOException ioe) {
   throw new HoodieIOException(
     "Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
  }
 }
}

代码示例来源:origin: cloudera-labs/envelope

@SuppressWarnings( {"rawtypes", "unchecked"})
private Dataset<Row> readInputFormat(String path) throws Exception {
 LOG.debug("Reading InputFormat[{}]: {}", inputType, path);
 Class<? extends InputFormat> typeClazz = Class.forName(inputType).asSubclass(InputFormat.class);
 Class<?> keyClazz = Class.forName(keyType);
 Class<?> valueClazz = Class.forName(valueType);
 @SuppressWarnings("resource")
 JavaSparkContext context = new JavaSparkContext(Contexts.getSparkSession().sparkContext());
 JavaPairRDD<?, ?> rdd = context.newAPIHadoopFile(path, typeClazz, keyClazz, valueClazz, new Configuration());
 TranslateFunction translateFunction = new TranslateFunction(translatorConfig);
 return Contexts.getSparkSession().createDataFrame(rdd.flatMap(translateFunction), translateFunction.getSchema());
}

代码示例来源:origin: com.uber.hoodie/hoodie-utilities

@Override
 public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
   Optional<String> lastCheckpointStr, long sourceLimit) {
  try {
   // find the source commit to pull
   Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);

   if (!commitToPull.isPresent()) {
    return new ImmutablePair<>(Optional.empty(),
      lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
   }

   // read the files out.
   List<FileStatus> commitDeltaFiles = Arrays.asList(
     fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
   String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
     .collect(Collectors.joining(","));
   JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
     AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
     sparkContext.hadoopConfiguration());
   return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
     String.valueOf(commitToPull.get()));
  } catch (IOException ioe) {
   throw new HoodieIOException(
     "Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
  }
 }
}

代码示例来源:origin: pl.edu.icm.spark-utils/spark-utils

/**
 * Returns a java rdd filled with records of the specified type (avroRecordClass). The records are read from an avro datastore directory specified by
 * the avroDateStore path 
 */
public <T extends GenericRecord> JavaRDD<T> loadJavaRDD(JavaSparkContext sc, String avroDatastorePath, Class<T> avroRecordClass) {
  Preconditions.checkNotNull(sc);
  Preconditions.checkNotNull(avroDatastorePath);
  Preconditions.checkNotNull(avroRecordClass);
  Schema schema = AvroUtils.toSchema(avroRecordClass.getName());
  Job job = getJob(schema);
  @SuppressWarnings("unchecked")
  JavaPairRDD<AvroKey<T>, NullWritable> inputRecords = (JavaPairRDD<AvroKey<T>, NullWritable>)
      sc.newAPIHadoopFile(avroDatastorePath, AvroKeyInputFormat.class, avroRecordClass, NullWritable.class, job.getConfiguration());
  
  
  // Hadoop's RecordReader reuses the same Writable object for all records
  // which may lead to undesired behavior when caching RDD.
  // Cloning records solves this problem.
  JavaRDD<T> input = inputRecords.map(tuple -> AvroUtils.cloneAvroRecord(tuple._1.datum()));
  return input;
}

代码示例来源:origin: DataSystemsLab/GeoSpark

/**
 * ShapefileRDD.
 *
 * @param sparkContext the spark context
 * @param filePath the file path
 */
public ShapefileRDD(JavaSparkContext sparkContext, String filePath)
{
  boundBox = new BoundBox();
  JavaPairRDD<ShapeKey, PrimitiveShape> shapePrimitiveRdd = sparkContext.newAPIHadoopFile(
      filePath,
      ShapeInputFormat.class,
      ShapeKey.class,
      PrimitiveShape.class,
      sparkContext.hadoopConfiguration()
  );
  shapeRDD = shapePrimitiveRdd.map(PrimitiveToShape);
}

代码示例来源:origin: org.datasyslab/geospark

/**
 * ShapefileRDD.
 *
 * @param sparkContext the spark context
 * @param filePath the file path
 */
public ShapefileRDD(JavaSparkContext sparkContext, String filePath)
{
  boundBox = new BoundBox();
  JavaPairRDD<ShapeKey, PrimitiveShape> shapePrimitiveRdd = sparkContext.newAPIHadoopFile(
      filePath,
      ShapeInputFormat.class,
      ShapeKey.class,
      PrimitiveShape.class,
      sparkContext.hadoopConfiguration()
  );
  shapeRDD = shapePrimitiveRdd.map(PrimitiveToShape);
}

代码示例来源:origin: com.uber.hoodie/hoodie-utilities

@Override
 protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
  JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
    AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
    sparkContext.hadoopConfiguration());
  return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
 }
}

代码示例来源:origin: uber/hudi

@Override
 protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
  JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
    AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
    sparkContext.hadoopConfiguration());
  return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
 }
}

代码示例来源:origin: DataSystemsLab/GeoSpark

/**
 * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
 */
public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath)
{
  // read bound boxes into memory
  JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
      inputPath,
      BoundaryInputFormat.class,
      Long.class,
      BoundBox.class,
      sc.hadoopConfiguration()
  );
  // merge all into one
  bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
  {
    @Override
    public BoundBox call(BoundBox box1, BoundBox box2)
        throws Exception
    {
      return BoundBox.mergeBoundBox(box1, box2);
    }
  });
  // if there is a result assign it to variable : boundBox
  if (bounds.count() > 0) {
    return new BoundBox(bounds.collect().get(0)._2());
  }
  else { return null; }
}

代码示例来源:origin: org.datasyslab/geospark

/**
   * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
   */
  public BoundBox getBoundBox(JavaSparkContext sc, String inputPath)
  {
    // read bound boxes into memory
    JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
        inputPath,
        BoundaryInputFormat.class,
        Long.class,
        BoundBox.class,
        sc.hadoopConfiguration()
    );
    // merge all into one
    bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
    {
      @Override
      public BoundBox call(BoundBox box1, BoundBox box2)
          throws Exception
      {
        return BoundBox.mergeBoundBox(box1, box2);
      }
    });
    // if there is a result assign it to variable : boundBox
    if (bounds.count() > 0) {
      return new BoundBox(bounds.collect().get(0)._2());
    }
    else { return null; }
  }
}

代码示例来源:origin: DataSystemsLab/GeoSpark

/**
   * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
   */
  public BoundBox getBoundBox(JavaSparkContext sc, String inputPath)
  {
    // read bound boxes into memory
    JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
        inputPath,
        BoundaryInputFormat.class,
        Long.class,
        BoundBox.class,
        sc.hadoopConfiguration()
    );
    // merge all into one
    bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
    {
      @Override
      public BoundBox call(BoundBox box1, BoundBox box2)
          throws Exception
      {
        return BoundBox.mergeBoundBox(box1, box2);
      }
    });
    // if there is a result assign it to variable : boundBox
    if (bounds.count() > 0) {
      return new BoundBox(bounds.collect().get(0)._2());
    }
    else { return null; }
  }
}

代码示例来源:origin: org.datasyslab/geospark

/**
 * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
 */
public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath)
{
  // read bound boxes into memory
  JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
      inputPath,
      BoundaryInputFormat.class,
      Long.class,
      BoundBox.class,
      sc.hadoopConfiguration()
  );
  // merge all into one
  bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
  {
    @Override
    public BoundBox call(BoundBox box1, BoundBox box2)
        throws Exception
    {
      return BoundBox.mergeBoundBox(box1, box2);
    }
  });
  // if there is a result assign it to variable : boundBox
  if (bounds.count() > 0) {
    return new BoundBox(bounds.collect().get(0)._2());
  }
  else { return null; }
}

相关文章

微信公众号

最新文章

更多