本文整理了Java中org.apache.spark.api.java.JavaPairRDD.mapValues()
方法的一些代码示例,展示了JavaPairRDD.mapValues()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.mapValues()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:mapValues
暂无
代码示例来源:origin: OryxProject/oryx
public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
return data.values().flatMapToPair(line -> {
Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
return distinctTokens.stream().flatMap(a ->
distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
).iterator();
}).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}
代码示例来源:origin: OryxProject/oryx
private static JavaPairRDD<String,Collection<String>> knownsRDD(JavaRDD<String[]> allData,
boolean knownItems) {
JavaRDD<String[]> sorted = allData.sortBy(datum -> Long.valueOf(datum[3]), true, allData.partitions().size());
JavaPairRDD<String,Tuple2<String,Boolean>> tuples = sorted.mapToPair(datum -> {
String user = datum[0];
String item = datum[1];
Boolean delete = datum[2].isEmpty();
return knownItems ?
new Tuple2<>(user, new Tuple2<>(item, delete)) :
new Tuple2<>(item, new Tuple2<>(user, delete));
});
// TODO likely need to figure out a way to avoid groupByKey but collectByKey
// won't work here -- doesn't guarantee enough about ordering
return tuples.groupByKey().mapValues(idDeletes -> {
Collection<String> ids = new HashSet<>();
for (Tuple2<String,Boolean> idDelete : idDeletes) {
if (idDelete._2()) {
ids.remove(idDelete._1());
} else {
ids.add(idDelete._1());
}
}
return ids;
});
}
代码示例来源:origin: apache/kylin
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) {
final ByteArray ONE = new ByteArray();
Long count = rdd.mapValues(new Function<Object[], Long>() {
@Override
public Long call(Object[] objects) throws Exception {
return (Long) objects[countMeasureIndex];
}
}).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
@Override
public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
throws Exception {
return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
}
})._2();
return count;
}
代码示例来源:origin: OryxProject/oryx
private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
boolean user) {
int offset = user ? 0 : 1;
Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
.distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
.zipWithIndex().mapValues(Long::intValue)
.collectAsMap();
// Clone, due to some serialization problems with the result of collectAsMap?
return new HashMap<>(reverseIDLookup);
}
代码示例来源:origin: OryxProject/oryx
private static RDD<Tuple2<Object,double[]>> readAndConvertFeatureRDD(
JavaPairRDD<String,float[]> javaRDD,
Broadcast<? extends Map<String,Integer>> bIdToIndex) {
RDD<Tuple2<Integer,double[]>> scalaRDD = javaRDD.mapToPair(t ->
new Tuple2<>(bIdToIndex.value().get(t._1()), t._2())
).mapValues(f -> {
double[] d = new double[f.length];
for (int i = 0; i < d.length; i++) {
d[i] = f[i];
}
return d;
}
).rdd();
// This mimics the persistence level establish by ALS training methods
scalaRDD.persist(StorageLevel.MEMORY_AND_DISK());
@SuppressWarnings("unchecked")
RDD<Tuple2<Object,double[]>> objKeyRDD = (RDD<Tuple2<Object,double[]>>) (RDD<?>) scalaRDD;
return objKeyRDD;
}
代码示例来源:origin: OryxProject/oryx
return targetsByTreeAndID.mapValues(categoricalTargets ->
StreamSupport.stream(categoricalTargets.spliterator(), false)
.collect(Collectors.groupingBy(f -> ((CategoricalFeature) f).getEncoding(), Collectors.counting()))
return targetsByTreeAndID.mapValues(numericTargets ->
StreamSupport.stream(numericTargets.spliterator(), false)
.collect(Collectors.summarizingDouble(f -> ((NumericFeature) f).getValue()))
代码示例来源:origin: OryxProject/oryx
if (model.isImplicit()) {
aggregated = tuples.groupByKey().mapValues(MLFunctions.SUM_WITH_NAN);
} else {
代码示例来源:origin: OryxProject/oryx
massageToIntKey(model.userFeatures()).mapValues(doubleArrayToFloats);
JavaPairRDD<Integer,float[]> itemFeaturesRDD =
massageToIntKey(model.productFeatures()).mapValues(doubleArrayToFloats);
代码示例来源:origin: OryxProject/oryx
/**
* Combines {@link Rating}s with the same user/item into one, with score as the sum of
* all of the scores.
*/
private JavaRDD<Rating> aggregateScores(JavaRDD<? extends Rating> original, double epsilon) {
JavaPairRDD<Tuple2<Integer,Integer>,Double> tuples =
original.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));
JavaPairRDD<Tuple2<Integer,Integer>,Double> aggregated;
if (implicit) {
// TODO can we avoid groupByKey? reduce, combine, fold don't seem viable since
// they don't guarantee the delete elements are properly handled
aggregated = tuples.groupByKey().mapValues(MLFunctions.SUM_WITH_NAN);
} else {
// For non-implicit, last wins.
aggregated = tuples.foldByKey(Double.NaN, (current, next) -> next);
}
JavaPairRDD<Tuple2<Integer,Integer>,Double> noNaN =
aggregated.filter(kv -> !Double.isNaN(kv._2()));
if (logStrength) {
return noNaN.map(userProductScore -> new Rating(
userProductScore._1()._1(),
userProductScore._1()._2(),
Math.log1p(userProductScore._2() / epsilon)));
} else {
return noNaN.map(userProductScore -> new Rating(
userProductScore._1()._1(),
userProductScore._1()._2(),
userProductScore._2()));
}
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static Map<Integer, Vector> getNewCentroids(JavaPairRDD<Integer, Tuple2<Vector, Integer>> pointsGroup) {
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(new Function<Tuple2<Vector, Integer>, Vector>() {
@Override
public Vector call(Tuple2<Vector, Integer> arg0) throws Exception {
return average(arg0._1, arg0._2);
}
}).collectAsMap();
return newCentroids;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static Map<Integer, Vector> getNewCentroids(
JavaPairRDD<Integer,
Iterable<Vector>> pointsGroup) {
//
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<Iterable<Vector>, Vector>() {
@Override
public Vector call(Iterable<Vector> ps) throws Exception {
return Util.average(ps);
}
}).collectAsMap();
return newCentroids;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static JavaPairRDD<String, Tuple2<Double, Integer>> computeRankedProducts(
JavaSparkContext context,
JavaPairRDD<String, Long>[] ranks) {
JavaPairRDD<String, Long> unionRDD = context.union(ranks);
// next find unique keys, with their associated copa scores
JavaPairRDD<String, Iterable<Long>> groupedByGeneRDD = unionRDD.groupByKey();
// next calculate ranked products and the number of elements
JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts =
groupedByGeneRDD.mapValues((Iterable<Long> values) -> {
int N = 0;
long products = 1;
for (Long v : values) {
products *= v;
N++;
}
double rankedProduct = Math.pow( (double) products, 1.0/((double) N));
return new Tuple2<Double, Integer>(rankedProduct, N);
} // input: copa scores for all studies
// output: (rankedProduct, N)
);
return rankedProducts;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
static JavaPairRDD<String, Tuple2<Double, Integer>> computeRankedProducts(
JavaSparkContext context,
JavaPairRDD<String, Long>[] ranks) {
JavaPairRDD<String, Long> unionRDD = context.union(ranks);
// next find unique keys, with their associated copa scores
JavaPairRDD<String, Iterable<Long>> groupedByGeneRDD = unionRDD.groupByKey();
// next calculate ranked products and the number of elements
JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts = groupedByGeneRDD.mapValues(
new Function<
Iterable<Long>, // input: copa scores for all studies
Tuple2<Double, Integer> // output: (rankedProduct, N)
>() {
@Override
public Tuple2<Double, Integer> call(Iterable<Long> values) {
int N = 0;
long products = 1;
for (Long v : values) {
products *= v;
N++;
}
double rankedProduct = Math.pow( (double) products, 1.0/((double) N));
return new Tuple2<Double, Integer>(rankedProduct, N);
}
});
return rankedProducts;
}
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Double> meanRDD = groupedByGene.mapValues((Iterable<Double> values) -> {
double sum = 0.0;
int count = 0;
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Double> meanRDD = groupedByGene.mapValues(
new Function<
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts = combinedByGeneRDD.mapValues(
new Function<
代码示例来源:origin: mahmoudparsian/data-algorithms-book
reduced.mapValues((Tuple2<Long, Integer> s) -> ( (double) s._1 / (double) s._2 )
);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
combinedByGeneRDD.mapValues((RankProduct value) -> {
double theRankedProduct = value.rank();
return new Tuple2<Double, Integer>(theRankedProduct, value.count);
代码示例来源:origin: apache/tinkerpop
@Override
public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
// determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
vertex.get().dropEdges(Direction.BOTH);
return vertex;
}).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
else
graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
}
代码示例来源:origin: apache/tinkerpop
public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
if (graphRDD.partitioner().isPresent())
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
// attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
property.attach(Attachable.Method.create(vertex));
}
return tuple._1();
});
}
内容来源于网络,如有侵权,请联系作者删除!