本文整理了Java中org.apache.spark.api.java.JavaPairRDD.collectAsMap()
方法的一些代码示例,展示了JavaPairRDD.collectAsMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.collectAsMap()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:collectAsMap
暂无
代码示例来源:origin: OryxProject/oryx
public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
return data.values().flatMapToPair(line -> {
Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
return distinctTokens.stream().flatMap(a ->
distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
).iterator();
}).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}
代码示例来源:origin: databricks/learning-spark
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new Exception("Usage KeyValueMapFilter sparkMaster inputFile");
}
String master = args[0];
String inputFile = args[1];
JavaSparkContext sc = new JavaSparkContext(
master, "KeyValueMapFilter", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile(inputFile);
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
Function<Tuple2<String, String>, Boolean> longWordFilter = new Function<Tuple2<String, String>, Boolean>() {
@Override
public Boolean call(Tuple2<String, String> input) {
return (input._2().length() < 20);
}
};
JavaPairRDD<String, String> rdd = input.mapToPair(keyData);
JavaPairRDD<String, String> result = rdd.filter(longWordFilter);
Map<String, String> resultMap = result.collectAsMap();
for (Entry<String, String> entry : resultMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
}
代码示例来源:origin: OryxProject/oryx
@Override
double evaluate(JavaRDD<Vector> evalData) {
return silhouetteCoefficient(fetchClusteredPoints(fetchSampleData(evalData)).collectAsMap());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
JavaPairRDD<String,Integer> rdd =
sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
Map<String,Integer> map = rdd.collectAsMap();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(map);
Map<String,Integer> deserializedMap = (Map<String,Integer>)
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
assertEquals(1, deserializedMap.get("foo").intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
JavaPairRDD<String,Integer> rdd =
sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
Map<String,Integer> map = rdd.collectAsMap();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(map);
Map<String,Integer> deserializedMap = (Map<String,Integer>)
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
assertEquals(1, deserializedMap.get("foo").intValue());
}
代码示例来源:origin: org.apache.spark/spark-core
@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
JavaPairRDD<String,Integer> rdd =
sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
Map<String,Integer> map = rdd.collectAsMap();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(map);
Map<String,Integer> deserializedMap = (Map<String,Integer>)
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
assertEquals(1, deserializedMap.get("foo").intValue());
}
代码示例来源:origin: OryxProject/oryx
private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
boolean user) {
int offset = user ? 0 : 1;
Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
.distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
.zipWithIndex().mapValues(Long::intValue)
.collectAsMap();
// Clone, due to some serialization problems with the result of collectAsMap?
return new HashMap<>(reverseIDLookup);
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD =
rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD =
rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD =
rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core
@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<>(2, 1),
new Tuple2<>(2, 1),
new Tuple2<>(1, 1),
new Tuple2<>(3, 2),
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void combineByKey() {
JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
Function<Integer, Integer> createCombinerFunction = v1 -> v1;
Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
.combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
Map<Integer, Integer> results = combinedRDD.collectAsMap();
ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
assertEquals(expected, results);
Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
combinedRDD.rdd(),
JavaConverters.collectionAsScalaIterableConverter(
Collections.<RDD<?>>emptyList()).asScala().toSeq());
combinedRDD = originalRDD.keyBy(keyFunction)
.combineByKey(
createCombinerFunction,
mergeValueFunction,
mergeValueFunction,
defaultPartitioner,
false,
new KryoSerializer(new SparkConf()));
results = combinedRDD.collectAsMap();
assertEquals(expected, results);
}
内容来源于网络,如有侵权,请联系作者删除!