本文整理了Java中org.apache.spark.api.java.JavaRDD.mapPartitions()
方法的一些代码示例,展示了JavaRDD.mapPartitions()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.mapPartitions()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:mapPartitions
暂无
代码示例来源:origin: OryxProject/oryx
private Map<Integer,Collection<String>> getDistinctValues(JavaRDD<String[]> parsedRDD) {
int[] categoricalIndices = IntStream.range(0, inputSchema.getNumFeatures()).
filter(inputSchema::isCategorical).toArray();
return parsedRDD.mapPartitions(data -> {
Map<Integer,Collection<String>> categoryValues = new HashMap<>();
for (int i : categoricalIndices) {
categoryValues.put(i, new HashSet<>());
}
data.forEachRemaining(datum ->
categoryValues.forEach((category, values) -> values.add(datum[category]))
);
return Collections.singleton(categoryValues).iterator();
}).reduce((v1, v2) -> {
// Assumes both have the same key set
v1.forEach((category, values) -> values.addAll(v2.get(category)));
return v1;
});
}
代码示例来源:origin: databricks/learning-spark
public static void main(String[] args) throws Exception {
if (args.length != 3) {
throw new Exception("Usage BasicLoadJson [sparkMaster] [jsoninput] [jsonoutput]");
}
String master = args[0];
String fileName = args[1];
String outfile = args[2];
JavaSparkContext sc = new JavaSparkContext(
master, "basicloadjson", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile(fileName);
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile);
}
}
代码示例来源:origin: databricks/learning-spark
JavaRDD<String> rdd = sc.parallelize(
Arrays.asList("KK6JKQ", "Ve3UoW", "kk6jlk", "W6BB"));
JavaRDD<String> result = rdd.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
public Iterable<String> call(Iterator<String> input) {
代码示例来源:origin: databricks/learning-spark
AvgCount result = rdd.mapPartitions(setup).reduce(combine);
System.out.println(result.avg());
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}
代码示例来源:origin: OryxProject/oryx
return trainPointData.mapPartitions(data -> {
DecisionTreeModel[] trees = model.trees();
List<IntLongHashMap> treeNodeIDCounts = IntStream.range(0, trees.length).
代码示例来源:origin: OryxProject/oryx
/**
* @param trainPointData data to run down trees
* @param model random decision forest model to count on
* @return map of predictor index to the number of training examples that reached a
* node whose decision is based on that feature. The index is among predictors, not all
* features, since there are fewer predictors than features. That is, the index will
* match the one used in the {@link RandomForestModel}.
*/
private static IntLongHashMap predictorExampleCounts(JavaRDD<? extends LabeledPoint> trainPointData,
RandomForestModel model) {
return trainPointData.mapPartitions(data -> {
IntLongHashMap featureIndexCount = new IntLongHashMap();
data.forEachRemaining(datum -> {
double[] featureVector = datum.features().toArray();
for (DecisionTreeModel tree : model.trees()) {
org.apache.spark.mllib.tree.model.Node node = tree.topNode();
// This logic cloned from Node.predict:
while (!node.isLeaf()) {
Split split = node.split().get();
int featureIndex = split.feature();
// Count feature
featureIndexCount.addToValue(featureIndex, 1);
node = nextNode(featureVector, node, split, featureIndex);
}
}
});
return Collections.singleton(featureIndexCount).iterator();
}).reduce(RDFUpdate::merge);
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<Map<Character, Integer>> partitions = lines.mapPartitions(
new FlatMapFunction<Iterator<String>, Map<Character, Integer>>() {
@Override
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<Map<Character, Long>> partitions = fastaRDD.mapPartitions(
new FlatMapFunction<Iterator<String>, Map<Character, Long>>() {
@Override
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<Map<Character, Long>> partitions = fastqRDD.mapPartitions(
new FlatMapFunction<Iterator<String>, Map<Character,Long>>() {
@Override
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<Tuple2<Integer, Integer>> partitions = numbers.mapPartitions((Iterator<String> iter) -> {
int min = 0; // 0 is never used
int max = 0; // 0 is never used
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaRDD<Tuple2<Integer, Integer>> partitions = numbers.mapPartitions(
new FlatMapFunction<Iterator<String>, Tuple2<Integer, Integer>>() {
@Override
代码示例来源:origin: DataSystemsLab/GeoSpark
/**
* Instantiates a new polygon RDD.
*
* @param sparkContext the spark context
* @param InputLocation the input location
* @param userSuppliedMapper the user supplied mapper
*/
public PolygonRDD(JavaSparkContext sparkContext, String InputLocation, FlatMapFunction userSuppliedMapper)
{
this.setRawSpatialRDD(sparkContext.textFile(InputLocation).mapPartitions(userSuppliedMapper));
}
代码示例来源:origin: DataSystemsLab/GeoSpark
/**
* Instantiates a new line string RDD.
*
* @param sparkContext the spark context
* @param InputLocation the input location
* @param partitions the partitions
* @param userSuppliedMapper the user supplied mapper
*/
public LineStringRDD(JavaSparkContext sparkContext, String InputLocation, Integer partitions, FlatMapFunction userSuppliedMapper)
{
this.setRawSpatialRDD(sparkContext.textFile(InputLocation, partitions).mapPartitions(userSuppliedMapper));
}
代码示例来源:origin: apache/tinkerpop
final GroupStep.GroupBiOperator<Object, Object> biOperator = (GroupStep.GroupBiOperator) endStep.getBiOperator();
result = ((GroupStep) endStep).generateFinalResult(nextRDD.
mapPartitions(partitions -> {
final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
return IteratorUtils.map(partitions, clone::projectTraverser);
final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
result = nextRDD
.mapPartitions(partitions -> {
final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
return IteratorUtils.map(partitions, clone::projectTraverser);
内容来源于网络,如有侵权,请联系作者删除!