本文整理了Java中org.apache.spark.api.java.JavaSparkContext.newAPIHadoopFile()
方法的一些代码示例,展示了JavaSparkContext.newAPIHadoopFile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaSparkContext.newAPIHadoopFile()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaSparkContext
类名称:JavaSparkContext
方法名:newAPIHadoopFile
暂无
代码示例来源:origin: org.apache.spark/spark-core_2.11
@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<>(1, "a"),
new Tuple2<>(2, "aa"),
new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<>(1, "a"),
new Tuple2<>(2, "aa"),
new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core
@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<>(1, "a"),
new Tuple2<>(2, "aa"),
new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<LongWritable ,Text> fastqRDD = ctx.newAPIHadoopFile(
inputPath,
FastqInputFormat.class,
代码示例来源:origin: stackoverflow.com
SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);
// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
"file_path", // input path
TextInputFormat.class, // used input format class
LongWritable.class, // class of the value
Text.class, // class of the value
new Configuration());
JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
// you will get each line from as a tuple (offset, text)
long pos = tuple._1().get(); // extract offset
String line = tuple._2().toString(); // extract text
return pos + " " + line;
}
});
代码示例来源:origin: stackoverflow.com
public class Utils {
public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
return records.keys()
.map(x -> (GenericRecord) x.datum())
.mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
}
}
代码示例来源:origin: stackoverflow.com
public class Utils {
public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
return records.keys()
.map(x -> (GenericRecord) x.datum())
.mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
}
}
代码示例来源:origin: stackoverflow.com
JavaSparkContext sc = new
JavaSparkContext(conf);
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class,
Text.class,
FileInfoWritable.class, new Configuration());
代码示例来源:origin: uber/hudi
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new ImmutablePair<>(Optional.empty(),
lastCheckpointStr.orElse(""));
}
// read the files out.
List<FileStatus> commitDeltaFiles = Arrays.asList(
fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException(
"Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}
代码示例来源:origin: cloudera-labs/envelope
@SuppressWarnings( {"rawtypes", "unchecked"})
private Dataset<Row> readInputFormat(String path) throws Exception {
LOG.debug("Reading InputFormat[{}]: {}", inputType, path);
Class<? extends InputFormat> typeClazz = Class.forName(inputType).asSubclass(InputFormat.class);
Class<?> keyClazz = Class.forName(keyType);
Class<?> valueClazz = Class.forName(valueType);
@SuppressWarnings("resource")
JavaSparkContext context = new JavaSparkContext(Contexts.getSparkSession().sparkContext());
JavaPairRDD<?, ?> rdd = context.newAPIHadoopFile(path, typeClazz, keyClazz, valueClazz, new Configuration());
TranslateFunction translateFunction = new TranslateFunction(translatorConfig);
return Contexts.getSparkSession().createDataFrame(rdd.flatMap(translateFunction), translateFunction.getSchema());
}
代码示例来源:origin: com.uber.hoodie/hoodie-utilities
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new ImmutablePair<>(Optional.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
// read the files out.
List<FileStatus> commitDeltaFiles = Arrays.asList(
fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException(
"Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}
代码示例来源:origin: pl.edu.icm.spark-utils/spark-utils
/**
* Returns a java rdd filled with records of the specified type (avroRecordClass). The records are read from an avro datastore directory specified by
* the avroDateStore path
*/
public <T extends GenericRecord> JavaRDD<T> loadJavaRDD(JavaSparkContext sc, String avroDatastorePath, Class<T> avroRecordClass) {
Preconditions.checkNotNull(sc);
Preconditions.checkNotNull(avroDatastorePath);
Preconditions.checkNotNull(avroRecordClass);
Schema schema = AvroUtils.toSchema(avroRecordClass.getName());
Job job = getJob(schema);
@SuppressWarnings("unchecked")
JavaPairRDD<AvroKey<T>, NullWritable> inputRecords = (JavaPairRDD<AvroKey<T>, NullWritable>)
sc.newAPIHadoopFile(avroDatastorePath, AvroKeyInputFormat.class, avroRecordClass, NullWritable.class, job.getConfiguration());
// Hadoop's RecordReader reuses the same Writable object for all records
// which may lead to undesired behavior when caching RDD.
// Cloning records solves this problem.
JavaRDD<T> input = inputRecords.map(tuple -> AvroUtils.cloneAvroRecord(tuple._1.datum()));
return input;
}
代码示例来源:origin: DataSystemsLab/GeoSpark
/**
* ShapefileRDD.
*
* @param sparkContext the spark context
* @param filePath the file path
*/
public ShapefileRDD(JavaSparkContext sparkContext, String filePath)
{
boundBox = new BoundBox();
JavaPairRDD<ShapeKey, PrimitiveShape> shapePrimitiveRdd = sparkContext.newAPIHadoopFile(
filePath,
ShapeInputFormat.class,
ShapeKey.class,
PrimitiveShape.class,
sparkContext.hadoopConfiguration()
);
shapeRDD = shapePrimitiveRdd.map(PrimitiveToShape);
}
代码示例来源:origin: org.datasyslab/geospark
/**
* ShapefileRDD.
*
* @param sparkContext the spark context
* @param filePath the file path
*/
public ShapefileRDD(JavaSparkContext sparkContext, String filePath)
{
boundBox = new BoundBox();
JavaPairRDD<ShapeKey, PrimitiveShape> shapePrimitiveRdd = sparkContext.newAPIHadoopFile(
filePath,
ShapeInputFormat.class,
ShapeKey.class,
PrimitiveShape.class,
sparkContext.hadoopConfiguration()
);
shapeRDD = shapePrimitiveRdd.map(PrimitiveToShape);
}
代码示例来源:origin: com.uber.hoodie/hoodie-utilities
@Override
protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
}
代码示例来源:origin: uber/hudi
@Override
protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
}
代码示例来源:origin: DataSystemsLab/GeoSpark
/**
* read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
*/
public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath)
{
// read bound boxes into memory
JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
inputPath,
BoundaryInputFormat.class,
Long.class,
BoundBox.class,
sc.hadoopConfiguration()
);
// merge all into one
bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
{
@Override
public BoundBox call(BoundBox box1, BoundBox box2)
throws Exception
{
return BoundBox.mergeBoundBox(box1, box2);
}
});
// if there is a result assign it to variable : boundBox
if (bounds.count() > 0) {
return new BoundBox(bounds.collect().get(0)._2());
}
else { return null; }
}
代码示例来源:origin: org.datasyslab/geospark
/**
* read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
*/
public BoundBox getBoundBox(JavaSparkContext sc, String inputPath)
{
// read bound boxes into memory
JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
inputPath,
BoundaryInputFormat.class,
Long.class,
BoundBox.class,
sc.hadoopConfiguration()
);
// merge all into one
bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
{
@Override
public BoundBox call(BoundBox box1, BoundBox box2)
throws Exception
{
return BoundBox.mergeBoundBox(box1, box2);
}
});
// if there is a result assign it to variable : boundBox
if (bounds.count() > 0) {
return new BoundBox(bounds.collect().get(0)._2());
}
else { return null; }
}
}
代码示例来源:origin: DataSystemsLab/GeoSpark
/**
* read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
*/
public BoundBox getBoundBox(JavaSparkContext sc, String inputPath)
{
// read bound boxes into memory
JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
inputPath,
BoundaryInputFormat.class,
Long.class,
BoundBox.class,
sc.hadoopConfiguration()
);
// merge all into one
bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
{
@Override
public BoundBox call(BoundBox box1, BoundBox box2)
throws Exception
{
return BoundBox.mergeBoundBox(box1, box2);
}
});
// if there is a result assign it to variable : boundBox
if (bounds.count() > 0) {
return new BoundBox(bounds.collect().get(0)._2());
}
else { return null; }
}
}
代码示例来源:origin: org.datasyslab/geospark
/**
* read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
*/
public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath)
{
// read bound boxes into memory
JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile(
inputPath,
BoundaryInputFormat.class,
Long.class,
BoundBox.class,
sc.hadoopConfiguration()
);
// merge all into one
bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>()
{
@Override
public BoundBox call(BoundBox box1, BoundBox box2)
throws Exception
{
return BoundBox.mergeBoundBox(box1, box2);
}
});
// if there is a result assign it to variable : boundBox
if (bounds.count() > 0) {
return new BoundBox(bounds.collect().get(0)._2());
}
else { return null; }
}
内容来源于网络,如有侵权,请联系作者删除!