org.apache.spark.api.java.JavaPairRDD.reduceByKey()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.reduceByKey()方法的一些代码示例,展示了JavaPairRDD.reduceByKey()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.reduceByKey()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:reduceByKey

JavaPairRDD.reduceByKey介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
 return data.values().flatMapToPair(line -> {
  Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
  return distinctTokens.stream().flatMap(a ->
   distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
  ).iterator();
 }).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: Alluxio/alluxio

/**
 * Spark job to check whether Spark executors can recognize Alluxio filesystem.
 *
 * @param sc current JavaSparkContext
 * @param reportWriter save user-facing messages to a generated file
 * @return Spark job result
 */
private Status runSparkJob(JavaSparkContext sc, PrintWriter reportWriter) {
 // Generate a list of integer for testing
 List<Integer> nums = IntStream.rangeClosed(1, mPartitions).boxed().collect(Collectors.toList());
 JavaRDD<Integer> dataSet = sc.parallelize(nums, mPartitions);
 // Run a Spark job to check whether Spark executors can recognize Alluxio
 JavaPairRDD<Status, String> extractedStatus = dataSet
   .mapToPair(s -> new Tuple2<>(CheckerUtils.performIntegrationChecks(),
     CheckerUtils.getLocalAddress()));
 // Merge the IP addresses that can/cannot recognize Alluxio
 JavaPairRDD<Status, String> mergeStatus = extractedStatus.reduceByKey((a, b)
   -> a.contains(b) ?  a : (b.contains(a) ? b : a + " " + b),
   (mPartitions < 10 ? 1 : mPartitions / 10));
 mSparkJobResult = mergeStatus.collect();
 Map<Status, List<String>> resultMap = new HashMap<>();
 for (Tuple2<Status, String> op : mSparkJobResult) {
  List<String> addresses = resultMap.getOrDefault(op._1, new ArrayList<>());
  addresses.add(op._2);
  resultMap.put(op._1, addresses);
 }
 return CheckerUtils.printNodesResults(resultMap, reportWriter);
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: databricks/learning-spark

public static final JavaPairRDD<Integer, Long> responseCodeCount(
  JavaRDD<ApacheAccessLog> accessLogRDD) {
 return accessLogRDD
  .mapToPair(new ResponseCodeTuple())
  .reduceByKey(new LongSumReducer());
}

代码示例来源:origin: org.apache.spark/spark-core

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: databricks/learning-spark

public static final JavaPairRDD<String, Long> endpointCount(
   JavaRDD<ApacheAccessLog> accessLogRDD) {
  return accessLogRDD
   .mapToPair(new EndPointTuple())
   .reduceByKey(new LongSumReducer());
 }
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: databricks/learning-spark

public static final JavaPairRDD<String, Long> ipAddressCount(
  JavaRDD<ApacheAccessLog> accessLogRDD) {
 return accessLogRDD
  .mapToPair(new IpTuple())
  .reduceByKey(new LongSumReducer());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: databricks/learning-spark

public static void main(String[] args) throws Exception {
    String master = args[0];
    JavaSparkContext sc = new JavaSparkContext(
   master, "wordcount", System.getenv("SPARK_HOME"), System.getenv("JARS"));
  JavaRDD<String> rdd = sc.textFile(args[1]);
  JavaPairRDD<String, Integer> counts = rdd.flatMap(
   new FlatMapFunction<String, String>() {
    public Iterable<String> call(String x) {
     return Arrays.asList(x.split(" "));
    }}).mapToPair(new PairFunction<String, String, Integer>(){
      public Tuple2<String, Integer> call(String x){
       return new Tuple2(x, 1);
      }}).reduceByKey(new Function2<Integer, Integer, Integer>(){
        public Integer call(Integer x, Integer y){ return x+y;}});
  counts.saveAsTextFile(args[2]);
  }
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: OryxProject/oryx

/**
 * @param evalData points to cluster for evaluation
 * @return cluster IDs as keys, and metrics for each cluster like the count, sum of distances to centroid,
 *  and sum of squared distances
 */
JavaPairRDD<Integer,ClusterMetric> fetchClusterMetrics(JavaRDD<Vector> evalData) {
 return evalData.mapToPair(vector -> {
  double closestDist = Double.POSITIVE_INFINITY;
  int minClusterID = Integer.MIN_VALUE;
  double[] vec = vector.toArray();
  for (ClusterInfo cluster : clusters.values()) {
   double distance = distanceFn.applyAsDouble(cluster.getCenter(), vec);
   if (distance < closestDist) {
    closestDist = distance;
    minClusterID = cluster.getID();
   }
  }
  Preconditions.checkState(!Double.isInfinite(closestDist) && !Double.isNaN(closestDist));
  return new Tuple2<>(minClusterID, new ClusterMetric(1L, closestDist, closestDist * closestDist));
 }).reduceByKey(ClusterMetric::add);
}

代码示例来源:origin: OryxProject/oryx

throw e;
}).reduceByKey((t1, t2) -> {
 double[] vec1 = t1._1();
 double[] vec2 = t2._1();

代码示例来源:origin: databricks/learning-spark

public Tuple2<String, Integer> call(String callSign) {
   return new Tuple2(callSign, 1);
  }}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer x, Integer y) {
     return x + y;
   String country = lookupCountry(sign, signPrefixes.value());
   return new Tuple2(country, callSignCount._2());
  }}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
System.out.println("Saved country contact counts as a file");

代码示例来源:origin: apache/kylin

.reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
  .reduceByKey(reduceFunction,
      SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
  .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());

代码示例来源:origin: apache/kylin

allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
  partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
  allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
      .persist(storageLevel);
  allRDDs[level - 1].unpersist();

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey((Integer i1, Integer i2) -> i1 + i2);
uniqueKeys.saveAsTextFile("/output/3");

代码示例来源:origin: mahmoudparsian/data-algorithms-book

ones.reduceByKey((Integer i1, Integer i2) -> i1 + i2);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

filtered.reduceByKey((LogStatistics stats, LogStatistics stats2) -> stats.merge(stats2));

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法