org.apache.spark.api.java.JavaPairRDD.collectAsMap()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(84)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.collectAsMap()方法的一些代码示例,展示了JavaPairRDD.collectAsMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.collectAsMap()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:collectAsMap

JavaPairRDD.collectAsMap介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
 return data.values().flatMapToPair(line -> {
  Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
  return distinctTokens.stream().flatMap(a ->
   distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
  ).iterator();
 }).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}

代码示例来源:origin: databricks/learning-spark

public static void main(String[] args) throws Exception {
    if (args.length != 2) {
   throw new Exception("Usage KeyValueMapFilter sparkMaster inputFile");
    }
  String master = args[0];
  String inputFile = args[1];

    JavaSparkContext sc = new JavaSparkContext(
   master, "KeyValueMapFilter", System.getenv("SPARK_HOME"), System.getenv("JARS"));
  JavaRDD<String> input = sc.textFile(inputFile);
  PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
   @Override
   public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
   }
  };
  Function<Tuple2<String, String>, Boolean> longWordFilter = new Function<Tuple2<String, String>, Boolean>() {
   @Override
   public Boolean call(Tuple2<String, String> input) {
    return (input._2().length() < 20);
   }
  };
  JavaPairRDD<String, String> rdd = input.mapToPair(keyData);
  JavaPairRDD<String, String> result = rdd.filter(longWordFilter);
  Map<String, String> resultMap = result.collectAsMap();
  for (Entry<String, String> entry : resultMap.entrySet()) {
   System.out.println(entry.getKey() + ":" + entry.getValue());
  }
  }
}

代码示例来源:origin: OryxProject/oryx

@Override
double evaluate(JavaRDD<Vector> evalData) {
 return silhouetteCoefficient(fetchClusteredPoints(fetchSampleData(evalData)).collectAsMap());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
 JavaPairRDD<String,Integer> rdd =
   sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
 Map<String,Integer> map = rdd.collectAsMap();
 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
 new ObjectOutputStream(bytes).writeObject(map);
 Map<String,Integer> deserializedMap = (Map<String,Integer>)
   new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
 assertEquals(1, deserializedMap.get("foo").intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
 JavaPairRDD<String,Integer> rdd =
   sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
 Map<String,Integer> map = rdd.collectAsMap();
 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
 new ObjectOutputStream(bytes).writeObject(map);
 Map<String,Integer> deserializedMap = (Map<String,Integer>)
   new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
 assertEquals(1, deserializedMap.get("foo").intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
 JavaPairRDD<String,Integer> rdd =
   sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
 Map<String,Integer> map = rdd.collectAsMap();
 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
 new ObjectOutputStream(bytes).writeObject(map);
 Map<String,Integer> deserializedMap = (Map<String,Integer>)
   new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
 assertEquals(1, deserializedMap.get("foo").intValue());
}

代码示例来源:origin: OryxProject/oryx

private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
                            boolean user) {
 int offset = user ? 0 : 1;
 Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
   .distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
   .zipWithIndex().mapValues(Long::intValue)
   .collectAsMap();
 // Clone, due to some serialization problems with the result of collectAsMap?
 return new HashMap<>(reverseIDLookup);
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
 public void collectAsMapWithIntArrayValues() {
  // Regression test for SPARK-1040
  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
  JavaPairRDD<Integer, int[]> pairRDD =
   rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
  pairRDD.collect();  // Works fine
  pairRDD.collectAsMap();  // Used to crash with ClassCastException
 }
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
 public void collectAsMapWithIntArrayValues() {
  // Regression test for SPARK-1040
  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
  JavaPairRDD<Integer, int[]> pairRDD =
   rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
  pairRDD.collect();  // Works fine
  pairRDD.collectAsMap();  // Used to crash with ClassCastException
 }
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void collectAsMapWithIntArrayValues() {
 // Regression test for SPARK-1040
 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
 JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
 pairRDD.collect();  // Works fine
 pairRDD.collectAsMap();  // Used to crash with ClassCastException
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void collectAsMapWithIntArrayValues() {
 // Regression test for SPARK-1040
 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
 JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
 pairRDD.collect();  // Works fine
 pairRDD.collectAsMap();  // Used to crash with ClassCastException
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
 public void collectAsMapWithIntArrayValues() {
  // Regression test for SPARK-1040
  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
  JavaPairRDD<Integer, int[]> pairRDD =
   rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
  pairRDD.collect();  // Works fine
  pairRDD.collectAsMap();  // Used to crash with ClassCastException
 }
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void collectAsMapWithIntArrayValues() {
 // Regression test for SPARK-1040
 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
 JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
 pairRDD.collect();  // Works fine
 pairRDD.collectAsMap();  // Used to crash with ClassCastException
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
 Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
 Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 Assert.assertEquals(1, localCounts.get(1).intValue());
 Assert.assertEquals(2, localCounts.get(2).intValue());
 Assert.assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
 List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
  new Tuple2<>(2, 1),
  new Tuple2<>(2, 1),
  new Tuple2<>(1, 1),
  new Tuple2<>(3, 2),
  new Tuple2<>(3, 1)
 );
 JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
 JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
 assertEquals(1, counts.lookup(1).get(0).intValue());
 assertEquals(2, counts.lookup(2).get(0).intValue());
 assertEquals(3, counts.lookup(3).get(0).intValue());
 Map<Integer, Integer> localCounts = counts.collectAsMap();
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
 localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
 assertEquals(1, localCounts.get(1).intValue());
 assertEquals(2, localCounts.get(2).intValue());
 assertEquals(3, localCounts.get(3).intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void combineByKey() {
 JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
 Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
 Function<Integer, Integer> createCombinerFunction = v1 -> v1;
 Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
 JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
  .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
 Map<Integer, Integer> results = combinedRDD.collectAsMap();
 ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
 assertEquals(expected, results);
 Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
  combinedRDD.rdd(),
  JavaConverters.collectionAsScalaIterableConverter(
   Collections.<RDD<?>>emptyList()).asScala().toSeq());
 combinedRDD = originalRDD.keyBy(keyFunction)
  .combineByKey(
   createCombinerFunction,
   mergeValueFunction,
   mergeValueFunction,
   defaultPartitioner,
   false,
   new KryoSerializer(new SparkConf()));
 results = combinedRDD.collectAsMap();
 assertEquals(expected, results);
}

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法