本文整理了Java中org.apache.spark.api.java.JavaPairRDD.reduceByKey()
方法的一些代码示例,展示了JavaPairRDD.reduceByKey()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.reduceByKey()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:reduceByKey
暂无
代码示例来源:origin: OryxProject/oryx
public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
return data.values().flatMapToPair(line -> {
Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
return distinctTokens.stream().flatMap(a ->
distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
).iterator();
}).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: Alluxio/alluxio
/**
* Spark job to check whether Spark executors can recognize Alluxio filesystem.
*
* @param sc current JavaSparkContext
* @param reportWriter save user-facing messages to a generated file
* @return Spark job result
*/
private Status runSparkJob(JavaSparkContext sc, PrintWriter reportWriter) {
// Generate a list of integer for testing
List<Integer> nums = IntStream.rangeClosed(1, mPartitions).boxed().collect(Collectors.toList());
JavaRDD<Integer> dataSet = sc.parallelize(nums, mPartitions);
// Run a Spark job to check whether Spark executors can recognize Alluxio
JavaPairRDD<Status, String> extractedStatus = dataSet
.mapToPair(s -> new Tuple2<>(CheckerUtils.performIntegrationChecks(),
CheckerUtils.getLocalAddress()));
// Merge the IP addresses that can/cannot recognize Alluxio
JavaPairRDD<Status, String> mergeStatus = extractedStatus.reduceByKey((a, b)
-> a.contains(b) ? a : (b.contains(a) ? b : a + " " + b),
(mPartitions < 10 ? 1 : mPartitions / 10));
mSparkJobResult = mergeStatus.collect();
Map<Status, List<String>> resultMap = new HashMap<>();
for (Tuple2<Status, String> op : mSparkJobResult) {
List<String> addresses = resultMap.getOrDefault(op._1, new ArrayList<>());
addresses.add(op._2);
resultMap.put(op._1, addresses);
}
return CheckerUtils.printNodesResults(resultMap, reportWriter);
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: databricks/learning-spark
public static final JavaPairRDD<Integer, Long> responseCodeCount(
JavaRDD<ApacheAccessLog> accessLogRDD) {
return accessLogRDD
.mapToPair(new ResponseCodeTuple())
.reduceByKey(new LongSumReducer());
}
代码示例来源:origin: org.apache.spark/spark-core
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: databricks/learning-spark
public static final JavaPairRDD<String, Long> endpointCount(
JavaRDD<ApacheAccessLog> accessLogRDD) {
return accessLogRDD
.mapToPair(new EndPointTuple())
.reduceByKey(new LongSumReducer());
}
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: databricks/learning-spark
public static final JavaPairRDD<String, Long> ipAddressCount(
JavaRDD<ApacheAccessLog> accessLogRDD) {
return accessLogRDD
.mapToPair(new IpTuple())
.reduceByKey(new LongSumReducer());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: databricks/learning-spark
public static void main(String[] args) throws Exception {
String master = args[0];
JavaSparkContext sc = new JavaSparkContext(
master, "wordcount", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> rdd = sc.textFile(args[1]);
JavaPairRDD<String, Integer> counts = rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}}).mapToPair(new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){ return x+y;}});
counts.saveAsTextFile(args[2]);
}
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: OryxProject/oryx
/**
* @param evalData points to cluster for evaluation
* @return cluster IDs as keys, and metrics for each cluster like the count, sum of distances to centroid,
* and sum of squared distances
*/
JavaPairRDD<Integer,ClusterMetric> fetchClusterMetrics(JavaRDD<Vector> evalData) {
return evalData.mapToPair(vector -> {
double closestDist = Double.POSITIVE_INFINITY;
int minClusterID = Integer.MIN_VALUE;
double[] vec = vector.toArray();
for (ClusterInfo cluster : clusters.values()) {
double distance = distanceFn.applyAsDouble(cluster.getCenter(), vec);
if (distance < closestDist) {
closestDist = distance;
minClusterID = cluster.getID();
}
}
Preconditions.checkState(!Double.isInfinite(closestDist) && !Double.isNaN(closestDist));
return new Tuple2<>(minClusterID, new ClusterMetric(1L, closestDist, closestDist * closestDist));
}).reduceByKey(ClusterMetric::add);
}
代码示例来源:origin: OryxProject/oryx
throw e;
}).reduceByKey((t1, t2) -> {
double[] vec1 = t1._1();
double[] vec2 = t2._1();
代码示例来源:origin: databricks/learning-spark
public Tuple2<String, Integer> call(String callSign) {
return new Tuple2(callSign, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x + y;
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
System.out.println("Saved country contact counts as a file");
代码示例来源:origin: apache/kylin
.reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
.reduceByKey(reduceFunction,
SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
代码示例来源:origin: apache/kylin
allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
.persist(storageLevel);
allRDDs[level - 1].unpersist();
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey((Integer i1, Integer i2) -> i1 + i2);
uniqueKeys.saveAsTextFile("/output/3");
代码示例来源:origin: mahmoudparsian/data-algorithms-book
ones.reduceByKey((Integer i1, Integer i2) -> i1 + i2);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
filtered.reduceByKey((LogStatistics stats, LogStatistics stats2) -> stats.merge(stats2));
内容来源于网络,如有侵权,请联系作者删除!