org.apache.spark.api.java.JavaRDD.getNumPartitions()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中org.apache.spark.api.java.JavaRDD.getNumPartitions()方法的一些代码示例,展示了JavaRDD.getNumPartitions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaRDD.getNumPartitions()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaRDD
类名称:JavaRDD
方法名:getNumPartitions

JavaRDD.getNumPartitions介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
                            boolean user) {
 int offset = user ? 0 : 1;
 Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
   .distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
   .zipWithIndex().mapValues(Long::intValue)
   .collectAsMap();
 // Clone, due to some serialization problems with the result of collectAsMap?
 return new HashMap<>(reverseIDLookup);
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void getNumPartitions(){
 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
 JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
 JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
  Arrays.asList(
   new Tuple2<>("a", 1),
   new Tuple2<>("aa", 2),
   new Tuple2<>("aaa", 3)
  ),
  2);
 assertEquals(3, rdd1.getNumPartitions());
 assertEquals(2, rdd2.getNumPartitions());
 assertEquals(2, rdd3.getNumPartitions());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void getNumPartitions(){
 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
 JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
 JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
  Arrays.asList(
   new Tuple2<>("a", 1),
   new Tuple2<>("aa", 2),
   new Tuple2<>("aaa", 3)
  ),
  2);
 assertEquals(3, rdd1.getNumPartitions());
 assertEquals(2, rdd2.getNumPartitions());
 assertEquals(2, rdd3.getNumPartitions());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void getNumPartitions(){
 JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
 JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
 JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
  Arrays.asList(
   new Tuple2<>("a", 1),
   new Tuple2<>("aa", 2),
   new Tuple2<>("aaa", 3)
  ),
  2);
 assertEquals(3, rdd1.getNumPartitions());
 assertEquals(2, rdd2.getNumPartitions());
 assertEquals(2, rdd3.getNumPartitions());
}

代码示例来源:origin: org.datasyslab/geospark

private static <U extends Geometry, T extends Geometry> void verifyPartitioningMatch(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD)
    throws Exception
{
  Objects.requireNonNull(spatialRDD.spatialPartitionedRDD, "[JoinQuery] spatialRDD SpatialPartitionedRDD is null. Please do spatial partitioning.");
  Objects.requireNonNull(queryRDD.spatialPartitionedRDD, "[JoinQuery] queryRDD SpatialPartitionedRDD is null. Please use the spatialRDD's grids to do spatial partitioning.");
  final SpatialPartitioner spatialPartitioner = spatialRDD.getPartitioner();
  final SpatialPartitioner queryPartitioner = queryRDD.getPartitioner();
  if (!queryPartitioner.equals(spatialPartitioner)) {
    throw new IllegalArgumentException("[JoinQuery] queryRDD is not partitioned by the same grids with spatialRDD. Please make sure they both use the same grids otherwise wrong results will appear.");
  }
  final int spatialNumPart = spatialRDD.spatialPartitionedRDD.getNumPartitions();
  final int queryNumPart = queryRDD.spatialPartitionedRDD.getNumPartitions();
  if (spatialNumPart != queryNumPart) {
    throw new IllegalArgumentException("[JoinQuery] numbers of partitions in queryRDD and spatialRDD don't match: " + queryNumPart + " vs. " + spatialNumPart + ". Please make sure they both use the same partitioning otherwise wrong results will appear.");
  }
}

代码示例来源:origin: DataSystemsLab/GeoSpark

private static <U extends Geometry, T extends Geometry> void verifyPartitioningMatch(SpatialRDD<T> spatialRDD, SpatialRDD<U> queryRDD)
    throws Exception
{
  Objects.requireNonNull(spatialRDD.spatialPartitionedRDD, "[JoinQuery] spatialRDD SpatialPartitionedRDD is null. Please do spatial partitioning.");
  Objects.requireNonNull(queryRDD.spatialPartitionedRDD, "[JoinQuery] queryRDD SpatialPartitionedRDD is null. Please use the spatialRDD's grids to do spatial partitioning.");
  final SpatialPartitioner spatialPartitioner = spatialRDD.getPartitioner();
  final SpatialPartitioner queryPartitioner = queryRDD.getPartitioner();
  if (!queryPartitioner.equals(spatialPartitioner)) {
    throw new IllegalArgumentException("[JoinQuery] queryRDD is not partitioned by the same grids with spatialRDD. Please make sure they both use the same grids otherwise wrong results will appear.");
  }
  final int spatialNumPart = spatialRDD.spatialPartitionedRDD.getNumPartitions();
  final int queryNumPart = queryRDD.spatialPartitionedRDD.getNumPartitions();
  if (spatialNumPart != queryNumPart) {
    throw new IllegalArgumentException("[JoinQuery] numbers of partitions in queryRDD and spatialRDD don't match: " + queryNumPart + " vs. " + spatialNumPart + ". Please make sure they both use the same partitioning otherwise wrong results will appear.");
  }
}

代码示例来源:origin: com.davidbracewell/mango

@Override
public SparkStream<T> shuffle(@NonNull Random random) {
 return new SparkStream<>(rdd.sortBy(t -> random.nextDouble(),
                   true,
                   rdd.getNumPartitions()
                   ));
}

代码示例来源:origin: cloudera-labs/envelope

@Test
public void testInputRepartitionColumnsAndPartitionCount() throws Exception {
 Map<String, Object> configMap = Maps.newHashMap();
 configMap.put(BatchStep.REPARTITION_COLUMNS_PROPERTY, Lists.newArrayList("modulo"));
 configMap.put(BatchStep.REPARTITION_NUM_PARTITIONS_PROPERTY, 5);
 configMap.put(DataStep.INPUT_TYPE + "." + InputFactory.TYPE_CONFIG_NAME, DummyInput.class.getName());
 configMap.put(DataStep.INPUT_TYPE + "." + "starting.partitions", 10);
 Config config = ConfigFactory.parseMap(configMap);
 BatchStep batchStep = new BatchStep("test");
 batchStep.configure(config);
 batchStep.submit(Sets.<Step>newHashSet());
 Dataset<Row> df = batchStep.getData();
 int numPartitions = df.javaRDD().getNumPartitions();
 assertEquals(5, numPartitions);
}

代码示例来源:origin: cloudera-labs/envelope

@Test
public void testInputCoalesce() throws Exception {
 Map<String, Object> configMap = Maps.newHashMap();
 configMap.put(DataStep.INPUT_TYPE + "." + InputFactory.TYPE_CONFIG_NAME, DummyInput.class.getName());
 configMap.put(DataStep.INPUT_TYPE + "." + "starting.partitions", 10);
 configMap.put(BatchStep.COALESCE_NUM_PARTITIONS_PROPERTY, 5);
 Config config = ConfigFactory.parseMap(configMap);
 
 BatchStep batchStep = new BatchStep("test");
 batchStep.configure(config);
 batchStep.submit(Sets.<Step>newHashSet());
 Dataset<Row> df = batchStep.getData();
 int numPartitions = df.javaRDD().getNumPartitions(); 
 
 assertEquals(numPartitions, 5);
}

代码示例来源:origin: cloudera-labs/envelope

@Test
public void testInputRepartition() throws Exception {
 Map<String, Object> configMap = Maps.newHashMap();
 configMap.put(DataStep.INPUT_TYPE + "." + InputFactory.TYPE_CONFIG_NAME, DummyInput.class.getName());
 configMap.put(DataStep.INPUT_TYPE + "." + "starting.partitions", 5);
 configMap.put(BatchStep.REPARTITION_NUM_PARTITIONS_PROPERTY, 10);
 Config config = ConfigFactory.parseMap(configMap);
 
 BatchStep batchStep = new BatchStep("test");
 batchStep.configure(config);
 batchStep.submit(Sets.<Step>newHashSet());
 Dataset<Row> df = batchStep.getData();
 int numPartitions = df.javaRDD().getNumPartitions(); 
 
 assertEquals(numPartitions, 10);
}

代码示例来源:origin: cloudera-labs/envelope

@Test
public void testInputRepartitionColumns() throws Exception {
 Map<String, Object> configMap = Maps.newHashMap();
 configMap.put(DataStep.INPUT_TYPE + "." + InputFactory.TYPE_CONFIG_NAME, DummyInput.class.getName());
 configMap.put(DataStep.INPUT_TYPE + "." + "starting.partitions", 10);
 configMap.put(BatchStep.REPARTITION_COLUMNS_PROPERTY, Lists.newArrayList("modulo"));
 Config config = ConfigFactory.parseMap(configMap);
 BatchStep batchStep = new BatchStep("test");
 batchStep.configure(config);
 batchStep.submit(Sets.<Step>newHashSet());
 Dataset<Row> df = batchStep.getData();
 int numPartitions = df.javaRDD().getNumPartitions();
 assertEquals(Contexts.getSparkSession().sqlContext().getConf("spark.sql.shuffle.partitions"),
   Integer.toString(numPartitions));
}

代码示例来源:origin: uber/hudi

@Test
public void testsHBasePutAccessParallelism() {
 HoodieWriteConfig config = getConfig();
 HBaseIndex index = new HBaseIndex(config);
 final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
   Arrays.asList(
     getSampleWriteStatus(1, 2),
     getSampleWriteStatus(0, 3),
     getSampleWriteStatus(10, 0)),
   10);
 final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD);
 Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
 Assert.assertEquals(2, hbasePutAccessParallelism);
}

代码示例来源:origin: org.apache.beam/beam-runners-spark

/** An implementation of {@link Reshuffle} for the Spark runner. */
 public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle(
   JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, WindowedValueCoder<V> wvCoder) {

  // Use coders to convert objects in the PCollection to byte arrays, so they
  // can be transferred over the network for the shuffle.
  return rdd.map(new ReifyTimestampsAndWindowsFunction<>())
    .map(WindowingHelpers.unwindowFunction())
    .mapToPair(TranslationUtils.toPairFunction())
    .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))
    .repartition(rdd.getNumPartitions())
    .mapToPair(CoderHelpers.fromByteFunction(keyCoder, wvCoder))
    .map(TranslationUtils.fromPairFunction())
    .map(TranslationUtils.toKVByWindowInValue());
 }
}

代码示例来源:origin: seznam/euphoria

final JavaRDD<SparkElement> right = (JavaRDD<SparkElement>) inputs.get(1);
final int numPartitions = right.getNumPartitions();

代码示例来源:origin: seznam/euphoria

final Partitioner partitioner = new HashPartitioner(input.getNumPartitions());

相关文章

微信公众号

最新文章

更多