本文整理了Java中org.apache.spark.api.java.JavaRDD.sortBy()
方法的一些代码示例,展示了JavaRDD.sortBy()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.sortBy()
方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:sortBy
暂无
代码示例来源:origin: OryxProject/oryx
private static JavaPairRDD<String,Collection<String>> knownsRDD(JavaRDD<String[]> allData,
boolean knownItems) {
JavaRDD<String[]> sorted = allData.sortBy(datum -> Long.valueOf(datum[3]), true, allData.partitions().size());
JavaPairRDD<String,Tuple2<String,Boolean>> tuples = sorted.mapToPair(datum -> {
String user = datum[0];
String item = datum[1];
Boolean delete = datum[2].isEmpty();
return knownItems ?
new Tuple2<>(user, new Tuple2<>(item, delete)) :
new Tuple2<>(item, new Tuple2<>(user, delete));
});
// TODO likely need to figure out a way to avoid groupByKey but collectByKey
// won't work here -- doesn't guarantee enough about ordering
return tuples.groupByKey().mapValues(idDeletes -> {
Collection<String> ids = new HashSet<>();
for (Tuple2<String,Boolean> idDelete : idDeletes) {
if (idDelete._2()) {
ids.remove(idDelete._1());
} else {
ids.add(idDelete._1());
}
}
return ids;
});
}
代码示例来源:origin: OryxProject/oryx
private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
boolean user) {
int offset = user ? 0 : 1;
Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
.distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
.zipWithIndex().mapValues(Long::intValue)
.collectAsMap();
// Clone, due to some serialization problems with the result of collectAsMap?
return new HashMap<>(reverseIDLookup);
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void sortBy() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(0, 4));
pairs.add(new Tuple2<>(3, 2));
pairs.add(new Tuple2<>(-1, 1));
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
// compare on first value
JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// compare on second value
sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void sortBy() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(0, 4));
pairs.add(new Tuple2<>(3, 2));
pairs.add(new Tuple2<>(-1, 1));
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
// compare on first value
JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// compare on second value
sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void sortBy() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(0, 4));
pairs.add(new Tuple2<>(3, 2));
pairs.add(new Tuple2<>(-1, 1));
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
// compare on first value
JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// compare on second value
sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
}
代码示例来源:origin: OryxProject/oryx
newData.values().sortBy(MLFunctions.TO_TIMESTAMP_FN, true, newData.partitions().size());
JavaPairRDD<Tuple2<String,String>,Double> tuples = sortedValues.mapToPair(line -> {
try {
代码示例来源:origin: com.davidbracewell/mango
@Override
public SparkStream<T> shuffle(@NonNull Random random) {
return new SparkStream<>(rdd.sortBy(t -> random.nextDouble(),
true,
rdd.getNumPartitions()
));
}
代码示例来源:origin: uk.gov.gchq.gaffer/parquet-store
private Map<Object, Integer> calculateSplitsForColumn(final JavaRDD<Element> data, final IdentifierType colName) {
final List<Object> splits = data.sample(false, 1.0 / sampleRate)
.map(element -> element.getIdentifier(colName))
.sortBy(obj -> obj, true, numOfSplits)
.mapPartitions(objectIterator -> {
final List<Object> list = new ArrayList<>(1);
if (objectIterator.hasNext()) {
list.add(objectIterator.next());
}
return list.iterator();
})
.collect();
final Map<Object, Integer> splitPoints = new TreeMap<>(COMPARATOR);
int i = 0;
for (final Object split : splits) {
if (null != split) {
splitPoints.put(split, i);
}
i++;
}
return splitPoints;
}
代码示例来源:origin: com.davidbracewell/mango
@Override
public <R extends Comparable<R>> MStream<T> sorted(boolean ascending, @NonNull SerializableFunction<? super T, ? extends R> keyFunction) {
return new SparkStream<>(rdd.sortBy(t -> {
Configurator.INSTANCE.configure(configBroadcast.value());
return keyFunction.apply(t);
}, ascending, rdd.partitions().size()));
}
代码示例来源:origin: com.davidbracewell/mango
@Override
public MDoubleStream sorted(boolean ascending) {
return new SparkDoubleStream(doubleStream.map(Double::valueOf)
.sortBy(d -> d, ascending, doubleStream.partitions().size())
.mapToDouble(d -> d));
}
代码示例来源:origin: uber/hudi
private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords,
String commitTime, HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords = bulkInsertPartitioner.get()
.repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
}
代码示例来源:origin: com.uber.hoodie/hoodie-client
private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords,
String commitTime, HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords = bulkInsertPartitioner.get()
.repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
}
代码示例来源:origin: com.cloudera.oryx/oryx-app
newData.values().sortBy(MLFunctions.TO_TIMESTAMP_FN, true, newData.partitions().size());
JavaPairRDD<Tuple2<String,String>,Double> tuples = sortedValues.mapToPair(line -> {
try {
内容来源于网络,如有侵权,请联系作者删除!