本文整理了Java中org.apache.spark.api.java.JavaPairRDD.join()
方法的一些代码示例,展示了JavaPairRDD.join()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.join()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:join
暂无
代码示例来源:origin: databricks/learning-spark
public void run(String master, String csv1, String csv2) throws Exception {
JavaSparkContext sc = new JavaSparkContext(
master, "basicjoincsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> csvFile1 = sc.textFile(csv1);
JavaRDD<String> csvFile2 = sc.textFile(csv2);
JavaPairRDD<Integer, String[]> keyedRDD1 = csvFile1.mapToPair(new ParseLine());
JavaPairRDD<Integer, String[]> keyedRDD2 = csvFile1.mapToPair(new ParseLine());
JavaPairRDD<Integer, Tuple2<String[], String[]>> result = keyedRDD1.join(keyedRDD2);
List<Tuple2<Integer, Tuple2<String[], String[]>>> resultCollection = result.collect();
}
}
代码示例来源:origin: OryxProject/oryx
/**
* Computes root mean squared error of {@link Rating#rating()} versus predicted value.
*/
static double rmse(MatrixFactorizationModel mfModel, JavaRDD<Rating> testData) {
JavaPairRDD<Tuple2<Integer,Integer>,Double> testUserProductValues =
testData.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));
@SuppressWarnings("unchecked")
RDD<Tuple2<Object,Object>> testUserProducts =
(RDD<Tuple2<Object,Object>>) (RDD<?>) testUserProductValues.keys().rdd();
JavaRDD<Rating> predictions = testData.wrapRDD(mfModel.predict(testUserProducts));
double mse = predictions.mapToPair(
rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating())
).join(testUserProductValues).values().mapToDouble(valuePrediction -> {
double diff = valuePrediction._1() - valuePrediction._2();
return diff * diff;
}).mean();
return Math.sqrt(mse);
}
代码示例来源:origin: OryxProject/oryx
@Override
public void publishAdditionalModelData(JavaSparkContext sparkContext,
PMML pmml,
JavaRDD<String> newData,
JavaRDD<String> pastData,
Path modelParentPath,
TopicProducer<String, String> modelUpdateTopic) {
// Send item updates first, before users. That way, user-based endpoints like /recommend
// may take longer to not return 404, but when they do, the result will be more complete.
log.info("Sending item / Y data as model updates");
String yPathString = AppPMMLUtils.getExtensionValue(pmml, "Y");
JavaPairRDD<String,float[]> productRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, yPathString));
String updateBroker = modelUpdateTopic.getUpdateBroker();
String topic = modelUpdateTopic.getTopic();
// For now, there is no use in sending known users for each item
productRDD.foreachPartition(new EnqueueFeatureVecsFn("Y", updateBroker, topic));
log.info("Sending user / X data as model updates");
String xPathString = AppPMMLUtils.getExtensionValue(pmml, "X");
JavaPairRDD<String,float[]> userRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, xPathString));
if (noKnownItems) {
userRDD.foreachPartition(new EnqueueFeatureVecsFn("X", updateBroker, topic));
} else {
log.info("Sending known item data with model updates");
JavaRDD<String[]> allData =
(pastData == null ? newData : newData.union(pastData)).map(MLFunctions.PARSE_FN);
JavaPairRDD<String,Collection<String>> knownItems = knownsRDD(allData, true);
userRDD.join(knownItems).foreachPartition(
new EnqueueFeatureVecsAndKnownItemsFn("X", updateBroker, topic));
}
}
代码示例来源:origin: OryxProject/oryx
predictAll(mfModel, positiveData, negativeUserProducts);
return positivePredictions.join(negativePredictions).values().mapToDouble(t -> {
代码示例来源:origin: mahmoudparsian/data-algorithms-book
joinedRDD = usersRDD.join(usersRDD);
代码示例来源:origin: mahmoudparsian/data-algorithms-book
JavaPairRDD<String, Tuple2<Tuple2<String, Integer>, Integer>> joined = uniquePairs.join(totalByKey);
代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform
/**
*
* 获取过滤后的全信息RDD
* @param filteredSessionRDD
* @param sessionInfoPairRDD
* @return
*/
private static JavaPairRDD<String, Row> getFilterFullInfoRDD(JavaPairRDD<String, String> filteredSessionRDD, JavaPairRDD<String, Row> sessionInfoPairRDD) {
//1.获取符合条件的session范围的所有品类
return filteredSessionRDD.join(sessionInfoPairRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
@Override
public Tuple2<String, Row> call(Tuple2<String, Tuple2<String, Row>> stringTuple2Tuple2) throws Exception {
return new Tuple2<String, Row>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._2);
}
});
}
/**
代码示例来源:origin: Erik-ly/SprakProject
/**
* 获取通过筛选条件的session的访问明细数据RDD
* @param sessionid2aggrInfoRDD
* @param sessionid2actionRDD
* @return
*/
private static JavaPairRDD<String, Row> getSessionid2detailRDD(
JavaPairRDD<String, String> sessionid2aggrInfoRDD,
JavaPairRDD<String, Row> sessionid2actionRDD) {
JavaPairRDD<String, Row> sessionid2detailRDD = sessionid2aggrInfoRDD
.join(sessionid2actionRDD)
.mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Row> call(
Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
return new Tuple2<String, Row>(tuple._1, tuple._2._2);
}
});
return sessionid2detailRDD;
}
代码示例来源:origin: co.cask.cdap/hydrator-spark-core2
@SuppressWarnings("unchecked")
@Override
public <T> SparkPairCollection<K, Tuple2<V, T>> join(SparkPairCollection<K, T> other) {
return wrap(pairRDD.join((JavaPairRDD<K, T>) other.getUnderlying()));
}
代码示例来源:origin: co.cask.cdap/hydrator-spark-core2
@SuppressWarnings("unchecked")
@Override
public <T> SparkPairCollection<K, Tuple2<V, T>> join(SparkPairCollection<K, T> other, int numPartitions) {
return wrap(pairRDD.join((JavaPairRDD<K, T>) other.getUnderlying(), numPartitions));
}
代码示例来源:origin: scipr-lab/dizk
public static <FieldT extends AbstractFieldElementExpanded<FieldT>> FieldT
distributedVariableBaseMSM(
final JavaPairRDD<Long, FieldT> scalars,
final JavaPairRDD<Long, FieldT> bases) {
return scalars.join(bases).map(pair -> pair._2._1.mul(pair._2._2)).reduce(FieldT::add);
}
}
代码示例来源:origin: com.davidbracewell/mango
@Override
public <V> MPairStream<T, Map.Entry<U, V>> join(@NonNull MPairStream<? extends T, ? extends V> stream) {
return new SparkPairStream<>(rdd.join(toPairRDD(stream))
.mapToPair(
t -> Cast.as(new scala.Tuple2<>(t._1(), toMapEntry(t._2()))))
);
}
代码示例来源:origin: uber/hudi
/**
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame
*
* @return a dataframe
*/
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws Exception {
assertSqlContext();
JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
.fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
Dataset<Row> originalDF = sqlContextOpt.get().read()
.parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});
// Now, we need to further filter out, for only rows that match the supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
代码示例来源:origin: Graphify/graphify
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(s -> {
int urlCount = Iterables.size(s._1());
代码示例来源:origin: com.uber.hoodie/hoodie-client
/**
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame
*
* @return a dataframe
*/
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws Exception {
assertSqlContext();
JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
.fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
Dataset<Row> originalDF = sqlContextOpt.get().read()
.parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});
// Now, we need to further filter out, for only rows that match the supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.10
pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
代码示例来源:origin: org.apache.spark/spark-streaming_2.11
PairFunction<Integer, Integer, Integer> mapToTuple =
(Integer i) -> new Tuple2<>(i, i);
return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
});
JavaTestUtils.attachTestOutputStream(transformed2);
代码示例来源:origin: Stratio/deep-spark
/**
* It tests if the extractor can join two data sets
*/
@Test
protected void testInnerJoin() {
DeepSparkContext context = getDeepSparkContext();
try {
JavaPairRDD<Long, TeamEntity> teamsRDD = prepareTeamRDD(context);
JavaPairRDD<Long, Iterable<PlayerEntity>> playersRDD = preparePlayerRDD(context).groupByKey();
JavaPairRDD<Long, Tuple2<TeamEntity, Iterable<PlayerEntity>>> joinRDD = teamsRDD.join(playersRDD);
assertEquals(joinRDD.count(), 4);
} finally {
context.stop();
}
}
代码示例来源:origin: org.apache.pig/pig
join(streamIndexedJavaPairRDD, partitioner);
代码示例来源:origin: org.qcri.rheem/rheem-spark
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();
final RddChannel.Instance input0 = (RddChannel.Instance) inputs[0];
final RddChannel.Instance input1 = (RddChannel.Instance) inputs[1];
final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
final JavaRDD<InputType0> inputRdd0 = input0.provideRdd();
final JavaRDD<InputType1> inputRdd1 = input1.provideRdd();
FunctionCompiler compiler = sparkExecutor.getCompiler();
final PairFunction<InputType0, KeyType, InputType0> keyExtractor0 = compiler.compileToKeyExtractor(this.keyDescriptor0);
final PairFunction<InputType1, KeyType, InputType1> keyExtractor1 = compiler.compileToKeyExtractor(this.keyDescriptor1);
JavaPairRDD<KeyType, InputType0> pairStream0 = inputRdd0.mapToPair(keyExtractor0);
JavaPairRDD<KeyType, InputType1> pairStream1 = inputRdd1.mapToPair(keyExtractor1);
final JavaPairRDD<KeyType, scala.Tuple2<InputType0, InputType1>> outputPair =
pairStream0.<InputType1>join(pairStream1, sparkExecutor.getNumDefaultPartitions());
this.name(outputPair);
// convert from scala tuple to rheem tuple
final JavaRDD<Tuple2<InputType0, InputType1>> outputRdd = outputPair
.map(new TupleConverter<>());
this.name(outputRdd);
output.accept(outputRdd, sparkExecutor);
return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}
内容来源于网络,如有侵权,请联系作者删除!