org.apache.spark.api.java.JavaPairRDD.join()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(170)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.join()方法的一些代码示例,展示了JavaPairRDD.join()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.join()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:join

JavaPairRDD.join介绍

暂无

代码示例

代码示例来源:origin: databricks/learning-spark

public void run(String master, String csv1, String csv2) throws Exception {
    JavaSparkContext sc = new JavaSparkContext(
   master, "basicjoincsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
  JavaRDD<String> csvFile1 = sc.textFile(csv1);
  JavaRDD<String> csvFile2 = sc.textFile(csv2);
  JavaPairRDD<Integer, String[]> keyedRDD1 = csvFile1.mapToPair(new ParseLine());
  JavaPairRDD<Integer, String[]> keyedRDD2 = csvFile1.mapToPair(new ParseLine());
  JavaPairRDD<Integer, Tuple2<String[], String[]>> result = keyedRDD1.join(keyedRDD2);
  List<Tuple2<Integer, Tuple2<String[], String[]>>> resultCollection = result.collect();
  }
}

代码示例来源:origin: OryxProject/oryx

/**
 * Computes root mean squared error of {@link Rating#rating()} versus predicted value.
 */
static double rmse(MatrixFactorizationModel mfModel, JavaRDD<Rating> testData) {
 JavaPairRDD<Tuple2<Integer,Integer>,Double> testUserProductValues =
   testData.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));
 @SuppressWarnings("unchecked")
 RDD<Tuple2<Object,Object>> testUserProducts =
   (RDD<Tuple2<Object,Object>>) (RDD<?>) testUserProductValues.keys().rdd();
 JavaRDD<Rating> predictions = testData.wrapRDD(mfModel.predict(testUserProducts));
 double mse = predictions.mapToPair(
   rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating())
 ).join(testUserProductValues).values().mapToDouble(valuePrediction -> {
  double diff = valuePrediction._1() - valuePrediction._2();
  return diff * diff;
 }).mean();
 return Math.sqrt(mse);
}

代码示例来源:origin: OryxProject/oryx

@Override
public void publishAdditionalModelData(JavaSparkContext sparkContext,
                    PMML pmml,
                    JavaRDD<String> newData,
                    JavaRDD<String> pastData,
                    Path modelParentPath,
                    TopicProducer<String, String> modelUpdateTopic) {
 // Send item updates first, before users. That way, user-based endpoints like /recommend
 // may take longer to not return 404, but when they do, the result will be more complete.
 log.info("Sending item / Y data as model updates");
 String yPathString = AppPMMLUtils.getExtensionValue(pmml, "Y");
 JavaPairRDD<String,float[]> productRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, yPathString));
 String updateBroker = modelUpdateTopic.getUpdateBroker();
 String topic = modelUpdateTopic.getTopic();
 // For now, there is no use in sending known users for each item
 productRDD.foreachPartition(new EnqueueFeatureVecsFn("Y", updateBroker, topic));
 log.info("Sending user / X data as model updates");
 String xPathString = AppPMMLUtils.getExtensionValue(pmml, "X");
 JavaPairRDD<String,float[]> userRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, xPathString));
 if (noKnownItems) {
  userRDD.foreachPartition(new EnqueueFeatureVecsFn("X", updateBroker, topic));
 } else {
  log.info("Sending known item data with model updates");
  JavaRDD<String[]> allData =
    (pastData == null ? newData : newData.union(pastData)).map(MLFunctions.PARSE_FN);
  JavaPairRDD<String,Collection<String>> knownItems = knownsRDD(allData, true);
  userRDD.join(knownItems).foreachPartition(
    new EnqueueFeatureVecsAndKnownItemsFn("X", updateBroker, topic));
 }
}

代码示例来源:origin: OryxProject/oryx

predictAll(mfModel, positiveData, negativeUserProducts);
return positivePredictions.join(negativePredictions).values().mapToDouble(t -> {

代码示例来源:origin: mahmoudparsian/data-algorithms-book

joinedRDD = usersRDD.join(usersRDD);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<String, Tuple2<Tuple2<String, Integer>, Integer>> joined = uniquePairs.join(totalByKey);

代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform

/**
 *
 * 获取过滤后的全信息RDD
 * @param filteredSessionRDD
 * @param sessionInfoPairRDD
 * @return
 */
private static JavaPairRDD<String, Row> getFilterFullInfoRDD(JavaPairRDD<String, String> filteredSessionRDD, JavaPairRDD<String, Row> sessionInfoPairRDD) {
  //1.获取符合条件的session范围的所有品类
  return filteredSessionRDD.join(sessionInfoPairRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
    @Override
    public Tuple2<String, Row> call(Tuple2<String, Tuple2<String, Row>> stringTuple2Tuple2) throws Exception {
      return new Tuple2<String, Row>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._2);
    }
  });
}
/**

代码示例来源:origin: Erik-ly/SprakProject

/**
 * 获取通过筛选条件的session的访问明细数据RDD
 * @param sessionid2aggrInfoRDD
 * @param sessionid2actionRDD
 * @return
 */
private static JavaPairRDD<String, Row> getSessionid2detailRDD(
    JavaPairRDD<String, String> sessionid2aggrInfoRDD,
    JavaPairRDD<String, Row> sessionid2actionRDD) {
  JavaPairRDD<String, Row> sessionid2detailRDD = sessionid2aggrInfoRDD
      .join(sessionid2actionRDD)
      .mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
        private static final long serialVersionUID = 1L;
        public Tuple2<String, Row> call(
            Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
          return new Tuple2<String, Row>(tuple._1, tuple._2._2);
        }
    
      });
  return sessionid2detailRDD;
}

代码示例来源:origin: co.cask.cdap/hydrator-spark-core2

@SuppressWarnings("unchecked")
@Override
public <T> SparkPairCollection<K, Tuple2<V, T>> join(SparkPairCollection<K, T> other) {
 return wrap(pairRDD.join((JavaPairRDD<K, T>) other.getUnderlying()));
}

代码示例来源:origin: co.cask.cdap/hydrator-spark-core2

@SuppressWarnings("unchecked")
@Override
public <T> SparkPairCollection<K, Tuple2<V, T>> join(SparkPairCollection<K, T> other, int numPartitions) {
 return wrap(pairRDD.join((JavaPairRDD<K, T>) other.getUnderlying(), numPartitions));
}

代码示例来源:origin: scipr-lab/dizk

public static <FieldT extends AbstractFieldElementExpanded<FieldT>> FieldT
  distributedVariableBaseMSM(
      final JavaPairRDD<Long, FieldT> scalars,
      final JavaPairRDD<Long, FieldT> bases) {
    return scalars.join(bases).map(pair -> pair._2._1.mul(pair._2._2)).reduce(FieldT::add);
  }
}

代码示例来源:origin: com.davidbracewell/mango

@Override
public <V> MPairStream<T, Map.Entry<U, V>> join(@NonNull MPairStream<? extends T, ? extends V> stream) {
 return new SparkPairStream<>(rdd.join(toPairRDD(stream))
                 .mapToPair(
                   t -> Cast.as(new scala.Tuple2<>(t._1(), toMapEntry(t._2()))))
 );
}

代码示例来源:origin: uber/hudi

/**
 * Given a bunch of hoodie keys, fetches all the individual records out as a data frame
 *
 * @return a dataframe
 */
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws Exception {
 assertSqlContext();
 JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
   .fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
 List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
   .map(keyFileTuple -> keyFileTuple._2().get()).collect();
 // record locations might be same for multiple keys, so need a unique list
 Set<String> uniquePaths = new HashSet<>(paths);
 Dataset<Row> originalDF = sqlContextOpt.get().read()
   .parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
 StructType schema = originalDF.schema();
 JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
  HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
    row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
  return new Tuple2<>(key, row);
 });
 // Now, we need to further filter out, for only rows that match the supplied hoodie keys
 JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
 return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}

代码示例来源:origin: Graphify/graphify

JavaPairRDD<String, Double> contribs = links.join(ranks).values()
    .flatMapToPair(s -> {
      int urlCount = Iterables.size(s._1());

代码示例来源:origin: com.uber.hoodie/hoodie-client

/**
 * Given a bunch of hoodie keys, fetches all the individual records out as a data frame
 *
 * @return a dataframe
 */
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws Exception {
 assertSqlContext();
 JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
   .fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
 List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
   .map(keyFileTuple -> keyFileTuple._2().get()).collect();
 // record locations might be same for multiple keys, so need a unique list
 Set<String> uniquePaths = new HashSet<>(paths);
 Dataset<Row> originalDF = sqlContextOpt.get().read()
   .parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
 StructType schema = originalDF.schema();
 JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
  HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
    row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
  return new Tuple2<>(key, row);
 });
 // Now, we need to further filter out, for only rows that match the supplied hoodie keys
 JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
 return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.10

pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));

代码示例来源:origin: org.apache.spark/spark-streaming_2.11

PairFunction<Integer, Integer, Integer> mapToTuple =
  (Integer i) -> new Tuple2<>(i, i);
 return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
});
JavaTestUtils.attachTestOutputStream(transformed2);

代码示例来源:origin: Stratio/deep-spark

/**
 * It tests if the extractor can join two data sets
 */
@Test
protected void testInnerJoin() {
  DeepSparkContext context = getDeepSparkContext();
  try {
    JavaPairRDD<Long, TeamEntity> teamsRDD = prepareTeamRDD(context);
    JavaPairRDD<Long, Iterable<PlayerEntity>> playersRDD = preparePlayerRDD(context).groupByKey();
    JavaPairRDD<Long, Tuple2<TeamEntity, Iterable<PlayerEntity>>> joinRDD = teamsRDD.join(playersRDD);
    assertEquals(joinRDD.count(), 4);
  } finally {
    context.stop();
  }
}

代码示例来源:origin: org.apache.pig/pig

join(streamIndexedJavaPairRDD, partitioner);

代码示例来源:origin: org.qcri.rheem/rheem-spark

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
    ChannelInstance[] inputs,
    ChannelInstance[] outputs,
    SparkExecutor sparkExecutor,
    OptimizationContext.OperatorContext operatorContext) {
  assert inputs.length == this.getNumInputs();
  assert outputs.length == this.getNumOutputs();
  final RddChannel.Instance input0 = (RddChannel.Instance) inputs[0];
  final RddChannel.Instance input1 = (RddChannel.Instance) inputs[1];
  final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
  final JavaRDD<InputType0> inputRdd0 = input0.provideRdd();
  final JavaRDD<InputType1> inputRdd1 = input1.provideRdd();
  FunctionCompiler compiler = sparkExecutor.getCompiler();
  final PairFunction<InputType0, KeyType, InputType0> keyExtractor0 = compiler.compileToKeyExtractor(this.keyDescriptor0);
  final PairFunction<InputType1, KeyType, InputType1> keyExtractor1 = compiler.compileToKeyExtractor(this.keyDescriptor1);
  JavaPairRDD<KeyType, InputType0> pairStream0 = inputRdd0.mapToPair(keyExtractor0);
  JavaPairRDD<KeyType, InputType1> pairStream1 = inputRdd1.mapToPair(keyExtractor1);
  final JavaPairRDD<KeyType, scala.Tuple2<InputType0, InputType1>> outputPair =
      pairStream0.<InputType1>join(pairStream1, sparkExecutor.getNumDefaultPartitions());
  this.name(outputPair);
  // convert from scala tuple to rheem tuple
  final JavaRDD<Tuple2<InputType0, InputType1>> outputRdd = outputPair
      .map(new TupleConverter<>());
  this.name(outputRdd);
  output.accept(outputRdd, sparkExecutor);
  return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法