org.apache.spark.api.java.JavaPairRDD.leftOuterJoin()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(16.7k)|赞(0)|评价(0)|浏览(91)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.leftOuterJoin()方法的一些代码示例,展示了JavaPairRDD.leftOuterJoin()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.leftOuterJoin()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:leftOuterJoin

JavaPairRDD.leftOuterJoin介绍

暂无

代码示例

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
 ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 Assert.assertEquals(5, joined.size());
 Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 Assert.assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
 ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 Assert.assertEquals(5, joined.size());
 Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 Assert.assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
  ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 assertEquals(5, joined.size());
 Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
 ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 Assert.assertEquals(5, joined.size());
 Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 Assert.assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
  ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 assertEquals(5, joined.size());
 Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: org.apache.spark/spark-core

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
 JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 1),
  new Tuple2<>(1, 2),
  new Tuple2<>(2, 1),
  new Tuple2<>(3, 1)
  ));
 JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
  new Tuple2<>(1, 'x'),
  new Tuple2<>(2, 'y'),
  new Tuple2<>(2, 'z'),
  new Tuple2<>(4, 'w')
 ));
 List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
  rdd1.leftOuterJoin(rdd2).collect();
 assertEquals(5, joined.size());
 Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
  rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
 assertEquals(3, firstUnmatched._1().intValue());
}

代码示例来源:origin: apache/tinkerpop

public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
    final JavaPairRDD<Object, VertexWritable> graphRDD,
    final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
    final Set<VertexComputeKey> vertexComputeKeys) {
  // the graphRDD and the viewRDD must have the same partitioner
  if (graphRDD.partitioner().isPresent())
    assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
  final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
  return graphRDD.leftOuterJoin(viewIncomingRDD)
      .mapValues(tuple -> {
        final StarGraph.StarVertex vertex = tuple._1().get();
        vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
        // attach the final computed view to the cached graph
        final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
        for (final DetachedVertexProperty<Object> property : view) {
          if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
            property.attach(Attachable.Method.create(vertex));
        }
        return tuple._1();
      });
}

代码示例来源:origin: apache/tinkerpop

final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
    graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
    graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages

代码示例来源:origin: co.cask.cdap/hydrator-spark-core2

public static <K, V1, V2> JavaPairRDD<K, Tuple2<V1, Optional<V2>>> leftOuterJoin(JavaPairRDD<K, V1> left,
                                         JavaPairRDD<K, V2> right,
                                         int numPartitions) {
 return left.leftOuterJoin(right, numPartitions).mapValues(new ConvertOptional<V1, V2>());
}

代码示例来源:origin: rathboma/hadoop-framework-examples

public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
  JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
  return leftJoinOutput;
}

代码示例来源:origin: co.cask.cdap/hydrator-spark-core2

public static <K, V1, V2> JavaPairRDD<K, Tuple2<V1, Optional<V2>>> leftOuterJoin(JavaPairRDD<K, V1> left,
                                         JavaPairRDD<K, V2> right) {
 return left.leftOuterJoin(right).mapValues(new ConvertOptional<V1, V2>());
}

代码示例来源:origin: com.uber.hoodie/hoodie-client

public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
  JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
 JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
   .mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
 // Lookup indexes for all the partition/recordkey pair
 JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
 JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
   .mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
 return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> {
  Optional<String> recordLocationPath;
  if (keyPathTuple._2._2.isPresent()) {
   String fileName = keyPathTuple._2._2.get();
   String partitionPath = keyPathTuple._2._1.getPartitionPath();
   recordLocationPath = Optional
     .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName)
       .toUri().getPath());
  } else {
   recordLocationPath = Optional.absent();
  }
  return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
 });
}

代码示例来源:origin: uber/hudi

public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
  JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
 JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
   .mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
 // Lookup indexes for all the partition/recordkey pair
 JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
 JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
   .mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
 return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> {
  Optional<String> recordLocationPath;
  if (keyPathTuple._2._2.isPresent()) {
   String fileName = keyPathTuple._2._2.get();
   String partitionPath = keyPathTuple._2._1.getPartitionPath();
   recordLocationPath = Optional
     .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName)
       .toUri().getPath());
  } else {
   recordLocationPath = Optional.absent();
  }
  return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
 });
}

代码示例来源:origin: com.davidbracewell/mango

@Override
public <V> MPairStream<T, Map.Entry<U, V>> leftOuterJoin(@NonNull MPairStream<? extends T, ? extends V> stream) {
 return new SparkPairStream<>(rdd.leftOuterJoin(toPairRDD(stream))
                 .mapToPair(t -> Cast.as(
                   new scala.Tuple2<>(t._1(), Tuple2.of(t._2()._1(), t._2()._2().or(null))))));
}

代码示例来源:origin: boneill42/spark-on-cassandra-quickstart

@SuppressWarnings("serial")
private void showResults(JavaSparkContext sc) {
  JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
      .cassandraTable("java_api", "summaries", summaryReader)
      .keyBy(new Function<Summary, Integer>() {
        @Override
        public Integer call(Summary summary) throws Exception {
          return summary.getProduct();
        }
      });
  JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
      .cassandraTable("java_api", "products", productReader)
      .keyBy(new Function<Product, Integer>() {
        @Override
        public Integer call(Product product) throws Exception {
          return product.getId();
        }
      });
  List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
  for (Tuple2<Product, Optional<Summary>> result : results) {
    System.out.println(result);
  }
}

代码示例来源:origin: Stratio/deep-spark

/**
 * It tests if the extractor can left join two data sets
 */
@Test
protected void testLeftOuterJoin() {
  DeepSparkContext context = getDeepSparkContext();
  try {
    JavaPairRDD<Long, TeamEntity> teamsRDD = prepareTeamRDD(context);
    JavaPairRDD<Long, Iterable<PlayerEntity>> playersRDD = preparePlayerRDD(context).groupByKey();
    JavaPairRDD<Long, Tuple2<TeamEntity, Optional<Iterable<PlayerEntity>>>> joinRDD =
        teamsRDD.leftOuterJoin(playersRDD);
    assertEquals(joinRDD.count(), teamsRDD.count());
  } finally {
    context.stop();
  }
}

代码示例来源:origin: uber/hudi

/**
 * Tag the <rowKey, filename> back to the original HoodieRecord RDD.
 */
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
  JavaPairRDD<String, String> rowKeyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
 JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
   .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
 // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
 // so we do left outer join.
 return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
  HoodieRecord<T> record = v1._1();
  if (v1._2().isPresent()) {
   String filename = v1._2().get();
   if (filename != null && !filename.isEmpty()) {
    // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
    // entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
    // record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
    // This check will create a new in memory copy of the hoodie record.
    if (record.getCurrentLocation() != null) {
     record = new HoodieRecord<T>(record.getKey(), record.getData());
    }
    record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
      FSUtils.getFileId(filename)));
   }
  }
  return record;
 });
}

代码示例来源:origin: com.uber.hoodie/hoodie-client

/**
 * Tag the <rowKey, filename> back to the original HoodieRecord RDD.
 */
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
  JavaPairRDD<String, String> rowKeyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
 JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
   .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
 // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
 // so we do left outer join.
 return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
  HoodieRecord<T> record = v1._1();
  if (v1._2().isPresent()) {
   String filename = v1._2().get();
   if (filename != null && !filename.isEmpty()) {
    // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
    // entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
    // record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
    // This check will create a new in memory copy of the hoodie record.
    if (record.getCurrentLocation() != null) {
     record = new HoodieRecord<T>(record.getKey(), record.getData());
    }
    record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
      FSUtils.getFileId(filename)));
   }
  }
  return record;
 });
}

代码示例来源:origin: ai.grakn/grakn-kb

public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
    final JavaPairRDD<Object, VertexWritable> graphRDD,
    final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
    final Set<VertexComputeKey> vertexComputeKeys) {
  // the graphRDD and the viewRDD must have the same partitioner
  if (graphRDD.partitioner().isPresent()){
    assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));}
  final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
  return graphRDD.leftOuterJoin(viewIncomingRDD)
      .mapValues(tuple -> {
        final StarGraph.StarVertex vertex = tuple._1().get();
        vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
        // attach the final computed view to the cached graph
        final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
        for (final DetachedVertexProperty<Object> property : view) {
          if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys)){
            property.attach(Attachable.Method.create(vertex));}
        }
        return tuple._1();
      });
}

代码示例来源:origin: org.apache.tinkerpop/spark-gremlin

public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
    final JavaPairRDD<Object, VertexWritable> graphRDD,
    final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
    final Set<VertexComputeKey> vertexComputeKeys) {
  // the graphRDD and the viewRDD must have the same partitioner
  if (graphRDD.partitioner().isPresent())
    assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
  final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
  return graphRDD.leftOuterJoin(viewIncomingRDD)
      .mapValues(tuple -> {
        final StarGraph.StarVertex vertex = tuple._1().get();
        vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
        // attach the final computed view to the cached graph
        final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
        for (final DetachedVertexProperty<Object> property : view) {
          if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
            property.attach(Attachable.Method.create(vertex));
        }
        return tuple._1();
      });
}

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法