本文整理了Java中org.apache.spark.api.java.JavaPairRDD.flatMapValues()
方法的一些代码示例,展示了JavaPairRDD.flatMapValues()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.flatMapValues()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:flatMapValues
暂无
代码示例来源:origin: databricks/learning-spark
public static <K, V> JavaPairRDD<K, V> intersectByKey(JavaPairRDD<K, V> rdd1, JavaPairRDD<K, V> rdd2) {
JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<V>>> grouped = rdd1.cogroup(rdd2);
return grouped.flatMapValues(new Function<Tuple2<Iterable<V>, Iterable<V>>, Iterable<V>>() {
@Override
public Iterable<V> call(Tuple2<Iterable<V>, Iterable<V>> input) {
ArrayList<V> al = new ArrayList<V>();
if (!Iterables.isEmpty(input._1()) && !Iterables.isEmpty(input._2())) {
Iterables.addAll(al, input._1());
Iterables.addAll(al, input._2());
}
return al;
}
});
}
public static void main(String[] args) throws Exception {
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Tuple2<String, Integer>> uniquePairs = grouped.flatMapValues(
new Function<Iterable<Tuple2<String, Integer>>, Iterable<Tuple2<String, Integer>>>() {
private static final long serialVersionUID = 5790208031487657081L;
代码示例来源:origin: io.zipkin.dependencies/zipkin-dependencies-cassandra3
JavaPairRDD<String, DependencyLink> flatMapToLinksByTraceId(
CassandraTableScanJavaRDD<CassandraRow> spans,
long microsUpper, long microsLower, boolean inTest
) {
if (strictTraceId) {
return spans.spanBy(ROW_TRACE_ID, String.class)
.flatMapValues(
new CassandraRowsToDependencyLinks(logInitializer, microsLower, microsUpper, inTest));
}
return spans.map(new CassandraRowToSpan(inTest))
.groupBy(SPAN_TRACE_ID) // groupBy instead of spanBy because trace_id is mixed length
.flatMapValues(new SpansToDependencyLinks(logInitializer, microsLower, microsUpper));
}
代码示例来源:origin: locationtech/geowave
joinedTiers.flatMapValues(
(Function<Tuple2<Iterable<Tuple2<GeoWaveInputKey, Geometry>>, Iterable<Tuple2<GeoWaveInputKey, Geometry>>>, Iterable<GeoWaveInputKey>>) t -> {
final GeomFunction predicate = geomPredicate.value();
代码示例来源:origin: io.zipkin.dependencies/zipkin-dependencies-mysql
.toJavaRDD()
.groupBy(rowTraceId)
.flatMapValues(new RowsToDependencyLinks(logInitializer, hasTraceIdHigh))
.values()
.mapToPair(LINK_TO_PAIR)
代码示例来源:origin: io.zipkin.dependencies/zipkin-dependencies-cassandra
public void run() {
long microsLower = day * 1000;
long microsUpper = (day * 1000) + TimeUnit.DAYS.toMicros(1) - 1;
log.info("Running Dependencies job for {}: {} ≤ Span.timestamp {}", dateStamp, microsLower,
microsUpper);
SparkContext sc = new SparkContext(conf);
List<DependencyLink> links = javaFunctions(sc)
.cassandraTable(keyspace, "traces")
.spanBy(ROW_TRACE_ID, Long.class)
.flatMapValues(new CassandraRowsToDependencyLinks(logInitializer, microsLower, microsUpper))
.values()
.mapToPair(LINK_TO_PAIR)
.reduceByKey(MERGE_LINK)
.values()
.collect();
sc.stop();
saveToCassandra(links);
}
代码示例来源:origin: jaegertracing/spark-dependencies
/**
* Derives dependency links based on supplied spans (e.g. multiple traces). If there is a link A->B
* in multiple traces it will return just one {@link Dependency} link with a correct {@link Dependency#callCount}.
* Note that RDDs are grouped on traceId so if a span contains multiple references from different traces
* the job does not produce correct result.
*
* @param traceIdSpans <traceId, trace> {@link org.apache.spark.api.java.JavaRDD} with trace id and a collection of
* spans with that traceId.
* @return Aggregated dependency links for all traces.
*/
public static List<Dependency> derive(JavaPairRDD<String, Iterable<Span>> traceIdSpans) {
return traceIdSpans.flatMapValues(new SpansToDependencyLinks())
.values()
.mapToPair(dependency -> new Tuple2<>(new Tuple2<>(dependency.getParent(), dependency.getChild()), dependency))
.reduceByKey((v1, v2) -> new Dependency(v1.getParent(), v1.getChild(), v1.getCallCount() + v2.getCallCount()))
.values()
.collect();
}
}
代码示例来源:origin: seznam/euphoria
.mapPartitionsToPair(ReduceByKeyIterator::new)
.setName(operator.getName() + "::create-iterator")
.flatMapValues(new Reducer<>(reducer, accumulatorProvider))
.setName(operator.getName() + "::apply-udf");
} else {
.mapPartitionsToPair(ReduceByKeyIterator::new)
.setName(operator.getName() + "::create-iterator")
.flatMapValues(new Reducer<>(reducer, accumulatorProvider))
.setName(operator.getName() + "::apply-udf");
.groupByKey()
.setName(operator.getName() + "::group-by-key")
.flatMapValues(new Reducer<>(reducer, accumulatorProvider))
.setName(operator.getName() + "::apply-udf");
内容来源于网络,如有侵权,请联系作者删除!