本文整理了Java中org.apache.spark.api.java.JavaPairRDD.leftOuterJoin()
方法的一些代码示例,展示了JavaPairRDD.leftOuterJoin()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.leftOuterJoin()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:leftOuterJoin
暂无
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
Assert.assertEquals(5, joined.size());
Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
Assert.assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
Assert.assertEquals(5, joined.size());
Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
Assert.assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
assertEquals(5, joined.size());
Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
Assert.assertEquals(5, joined.size());
Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
Assert.assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
assertEquals(5, joined.size());
Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: org.apache.spark/spark-core
@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(1, 2),
new Tuple2<>(2, 1),
new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, 'x'),
new Tuple2<>(2, 'y'),
new Tuple2<>(2, 'z'),
new Tuple2<>(4, 'w')
));
List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
assertEquals(5, joined.size());
Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
assertEquals(3, firstUnmatched._1().intValue());
}
代码示例来源:origin: apache/tinkerpop
public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
if (graphRDD.partitioner().isPresent())
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
// attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
property.attach(Attachable.Method.create(vertex));
}
return tuple._1();
});
}
代码示例来源:origin: apache/tinkerpop
final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
代码示例来源:origin: co.cask.cdap/hydrator-spark-core2
public static <K, V1, V2> JavaPairRDD<K, Tuple2<V1, Optional<V2>>> leftOuterJoin(JavaPairRDD<K, V1> left,
JavaPairRDD<K, V2> right,
int numPartitions) {
return left.leftOuterJoin(right, numPartitions).mapValues(new ConvertOptional<V1, V2>());
}
代码示例来源:origin: rathboma/hadoop-framework-examples
public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
return leftJoinOutput;
}
代码示例来源:origin: co.cask.cdap/hydrator-spark-core2
public static <K, V1, V2> JavaPairRDD<K, Tuple2<V1, Optional<V2>>> leftOuterJoin(JavaPairRDD<K, V1> left,
JavaPairRDD<K, V2> right) {
return left.leftOuterJoin(right).mapValues(new ConvertOptional<V1, V2>());
}
代码示例来源:origin: com.uber.hoodie/hoodie-client
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional
.of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName)
.toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
});
}
代码示例来源:origin: uber/hudi
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional
.of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName)
.toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
});
}
代码示例来源:origin: com.davidbracewell/mango
@Override
public <V> MPairStream<T, Map.Entry<U, V>> leftOuterJoin(@NonNull MPairStream<? extends T, ? extends V> stream) {
return new SparkPairStream<>(rdd.leftOuterJoin(toPairRDD(stream))
.mapToPair(t -> Cast.as(
new scala.Tuple2<>(t._1(), Tuple2.of(t._2()._1(), t._2()._2().or(null))))));
}
代码示例来源:origin: boneill42/spark-on-cassandra-quickstart
@SuppressWarnings("serial")
private void showResults(JavaSparkContext sc) {
JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
.cassandraTable("java_api", "summaries", summaryReader)
.keyBy(new Function<Summary, Integer>() {
@Override
public Integer call(Summary summary) throws Exception {
return summary.getProduct();
}
});
JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
.cassandraTable("java_api", "products", productReader)
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
for (Tuple2<Product, Optional<Summary>> result : results) {
System.out.println(result);
}
}
代码示例来源:origin: Stratio/deep-spark
/**
* It tests if the extractor can left join two data sets
*/
@Test
protected void testLeftOuterJoin() {
DeepSparkContext context = getDeepSparkContext();
try {
JavaPairRDD<Long, TeamEntity> teamsRDD = prepareTeamRDD(context);
JavaPairRDD<Long, Iterable<PlayerEntity>> playersRDD = preparePlayerRDD(context).groupByKey();
JavaPairRDD<Long, Tuple2<TeamEntity, Optional<Iterable<PlayerEntity>>>> joinRDD =
teamsRDD.leftOuterJoin(playersRDD);
assertEquals(joinRDD.count(), teamsRDD.count());
} finally {
context.stop();
}
}
代码示例来源:origin: uber/hudi
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<String, String> rowKeyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
// This check will create a new in memory copy of the hoodie record.
if (record.getCurrentLocation() != null) {
record = new HoodieRecord<T>(record.getKey(), record.getData());
}
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
});
}
代码示例来源:origin: com.uber.hoodie/hoodie-client
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<String, String> rowKeyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
// This check will create a new in memory copy of the hoodie record.
if (record.getCurrentLocation() != null) {
record = new HoodieRecord<T>(record.getKey(), record.getData());
}
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
});
}
代码示例来源:origin: ai.grakn/grakn-kb
public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
if (graphRDD.partitioner().isPresent()){
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));}
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
// attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys)){
property.attach(Attachable.Method.create(vertex));}
}
return tuple._1();
});
}
代码示例来源:origin: org.apache.tinkerpop/spark-gremlin
public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
if (graphRDD.partitioner().isPresent())
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
// attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
property.attach(Attachable.Method.create(vertex));
}
return tuple._1();
});
}
内容来源于网络,如有侵权,请联系作者删除!