org.apache.spark.api.java.JavaPairRDD.mapValues()方法的使用及代码示例

x33g5p2x  于2022-01-21 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(134)

本文整理了Java中org.apache.spark.api.java.JavaPairRDD.mapValues()方法的一些代码示例,展示了JavaPairRDD.mapValues()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。JavaPairRDD.mapValues()方法的具体详情如下:
包路径:org.apache.spark.api.java.JavaPairRDD
类名称:JavaPairRDD
方法名:mapValues

JavaPairRDD.mapValues介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

public static Map<String,Integer> countDistinctOtherWords(JavaPairRDD<String,String> data) {
 return data.values().flatMapToPair(line -> {
  Set<String> distinctTokens = new HashSet<>(Arrays.asList(line.split(" ")));
  return distinctTokens.stream().flatMap(a ->
   distinctTokens.stream().filter(b -> !a.equals(b)).map(b -> new Tuple2<>(a, b))
  ).iterator();
 }).distinct().mapValues(a -> 1).reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
}

代码示例来源:origin: OryxProject/oryx

private static JavaPairRDD<String,Collection<String>> knownsRDD(JavaRDD<String[]> allData,
                                boolean knownItems) {
 JavaRDD<String[]> sorted = allData.sortBy(datum -> Long.valueOf(datum[3]), true, allData.partitions().size());
 JavaPairRDD<String,Tuple2<String,Boolean>> tuples = sorted.mapToPair(datum -> {
   String user = datum[0];
   String item = datum[1];
   Boolean delete = datum[2].isEmpty();
   return knownItems ?
     new Tuple2<>(user, new Tuple2<>(item, delete)) :
     new Tuple2<>(item, new Tuple2<>(user, delete));
  });
 // TODO likely need to figure out a way to avoid groupByKey but collectByKey
 // won't work here -- doesn't guarantee enough about ordering
 return tuples.groupByKey().mapValues(idDeletes -> {
   Collection<String> ids = new HashSet<>();
   for (Tuple2<String,Boolean> idDelete : idDeletes) {
    if (idDelete._2()) {
     ids.remove(idDelete._1());
    } else {
     ids.add(idDelete._1());
    }
   }
   return ids;
  });
}

代码示例来源:origin: apache/kylin

private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) {
  final ByteArray ONE = new ByteArray();
  Long count = rdd.mapValues(new Function<Object[], Long>() {
    @Override
    public Long call(Object[] objects) throws Exception {
      return (Long) objects[countMeasureIndex];
    }
  }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
    @Override
    public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
        throws Exception {
      return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
    }
  })._2();
  return count;
}

代码示例来源:origin: OryxProject/oryx

private static Map<String,Integer> buildIDIndexMapping(JavaRDD<String[]> parsedRDD,
                            boolean user) {
 int offset = user ? 0 : 1;
 Map<String,Integer> reverseIDLookup = parsedRDD.map(tokens -> tokens[offset])
   .distinct().sortBy(s -> s, true, parsedRDD.getNumPartitions())
   .zipWithIndex().mapValues(Long::intValue)
   .collectAsMap();
 // Clone, due to some serialization problems with the result of collectAsMap?
 return new HashMap<>(reverseIDLookup);
}

代码示例来源:origin: OryxProject/oryx

private static RDD<Tuple2<Object,double[]>> readAndConvertFeatureRDD(
  JavaPairRDD<String,float[]> javaRDD,
  Broadcast<? extends Map<String,Integer>> bIdToIndex) {
 RDD<Tuple2<Integer,double[]>> scalaRDD = javaRDD.mapToPair(t ->
   new Tuple2<>(bIdToIndex.value().get(t._1()), t._2())
 ).mapValues(f -> {
   double[] d = new double[f.length];
   for (int i = 0; i < d.length; i++) {
    d[i] = f[i];
   }
   return d;
  }
 ).rdd();
 // This mimics the persistence level establish by ALS training methods
 scalaRDD.persist(StorageLevel.MEMORY_AND_DISK());
 @SuppressWarnings("unchecked")
 RDD<Tuple2<Object,double[]>> objKeyRDD = (RDD<Tuple2<Object,double[]>>) (RDD<?>) scalaRDD;
 return objKeyRDD;
}

代码示例来源:origin: OryxProject/oryx

return targetsByTreeAndID.mapValues(categoricalTargets ->
 StreamSupport.stream(categoricalTargets.spliterator(), false)
   .collect(Collectors.groupingBy(f -> ((CategoricalFeature) f).getEncoding(), Collectors.counting()))
return targetsByTreeAndID.mapValues(numericTargets ->
 StreamSupport.stream(numericTargets.spliterator(), false)
   .collect(Collectors.summarizingDouble(f -> ((NumericFeature) f).getValue()))

代码示例来源:origin: OryxProject/oryx

if (model.isImplicit()) {
 aggregated = tuples.groupByKey().mapValues(MLFunctions.SUM_WITH_NAN);
} else {

代码示例来源:origin: OryxProject/oryx

massageToIntKey(model.userFeatures()).mapValues(doubleArrayToFloats);
JavaPairRDD<Integer,float[]> itemFeaturesRDD =
  massageToIntKey(model.productFeatures()).mapValues(doubleArrayToFloats);

代码示例来源:origin: OryxProject/oryx

/**
 * Combines {@link Rating}s with the same user/item into one, with score as the sum of
 * all of the scores.
 */
private JavaRDD<Rating> aggregateScores(JavaRDD<? extends Rating> original, double epsilon) {
 JavaPairRDD<Tuple2<Integer,Integer>,Double> tuples =
   original.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));
 JavaPairRDD<Tuple2<Integer,Integer>,Double> aggregated;
 if (implicit) {
  // TODO can we avoid groupByKey? reduce, combine, fold don't seem viable since
  // they don't guarantee the delete elements are properly handled
  aggregated = tuples.groupByKey().mapValues(MLFunctions.SUM_WITH_NAN);
 } else {
  // For non-implicit, last wins.
  aggregated = tuples.foldByKey(Double.NaN, (current, next) -> next);
 }
 JavaPairRDD<Tuple2<Integer,Integer>,Double> noNaN =
   aggregated.filter(kv -> !Double.isNaN(kv._2()));
 if (logStrength) {
  return noNaN.map(userProductScore -> new Rating(
    userProductScore._1()._1(),
    userProductScore._1()._2(),
    Math.log1p(userProductScore._2() / epsilon)));
 } else {
  return noNaN.map(userProductScore -> new Rating(
    userProductScore._1()._1(),
    userProductScore._1()._2(),
    userProductScore._2()));
 }
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static Map<Integer, Vector> getNewCentroids(JavaPairRDD<Integer, Tuple2<Vector, Integer>> pointsGroup) {
  Map<Integer, Vector> newCentroids = pointsGroup.mapValues(new Function<Tuple2<Vector, Integer>, Vector>() {
    @Override
    public Vector call(Tuple2<Vector, Integer> arg0) throws Exception {
      return average(arg0._1, arg0._2);
    }
  }).collectAsMap();
  return newCentroids;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static Map<Integer, Vector> getNewCentroids(
    JavaPairRDD<Integer, 
    Iterable<Vector>> pointsGroup) {
  //
  Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
      new Function<Iterable<Vector>, Vector>() {
    @Override
    public Vector call(Iterable<Vector> ps) throws Exception {
      return Util.average(ps);
    }
  }).collectAsMap();
  return newCentroids;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static JavaPairRDD<String, Tuple2<Double, Integer>> computeRankedProducts(
    JavaSparkContext context,
    JavaPairRDD<String, Long>[] ranks) {
  JavaPairRDD<String, Long> unionRDD = context.union(ranks);
  
  // next find unique keys, with their associated copa scores
  JavaPairRDD<String, Iterable<Long>> groupedByGeneRDD = unionRDD.groupByKey();
  
  // next calculate ranked products and the number of elements
  JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts = 
      groupedByGeneRDD.mapValues((Iterable<Long> values) -> {
    int N = 0;
    long products = 1;
    for (Long v : values) {
      products *= v;
      N++;
    }
    
    double rankedProduct = Math.pow( (double) products, 1.0/((double) N));
    return new Tuple2<Double, Integer>(rankedProduct, N);
  } // input: copa scores for all studies
  // output: (rankedProduct, N)
  );                
  return rankedProducts;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

static JavaPairRDD<String, Tuple2<Double, Integer>> computeRankedProducts(
    JavaSparkContext context,
    JavaPairRDD<String, Long>[] ranks) {
  JavaPairRDD<String, Long> unionRDD = context.union(ranks);
  
  // next find unique keys, with their associated copa scores
  JavaPairRDD<String, Iterable<Long>> groupedByGeneRDD = unionRDD.groupByKey();
  
  // next calculate ranked products and the number of elements
  JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts = groupedByGeneRDD.mapValues(
      new Function<
             Iterable<Long>,            // input: copa scores for all studies
             Tuple2<Double, Integer>    // output: (rankedProduct, N)
            >() {
      @Override
      public Tuple2<Double, Integer> call(Iterable<Long> values) {
        int N = 0;
        long products = 1;
        for (Long v : values) {
          products *= v;
          N++;
        }
        
        double rankedProduct = Math.pow( (double) products, 1.0/((double) N));
        return new Tuple2<Double, Integer>(rankedProduct, N);
      }
  });                
  return rankedProducts;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<String, Double> meanRDD = groupedByGene.mapValues((Iterable<Double> values) -> {
  double sum = 0.0;
  int count = 0;

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<String, Double> meanRDD = groupedByGene.mapValues(
    new Function<

代码示例来源:origin: mahmoudparsian/data-algorithms-book

JavaPairRDD<String, Tuple2<Double, Integer>> rankedProducts = combinedByGeneRDD.mapValues(
    new Function<

代码示例来源:origin: mahmoudparsian/data-algorithms-book

reduced.mapValues((Tuple2<Long, Integer> s) -> ( (double) s._1 / (double) s._2 ) 
);

代码示例来源:origin: mahmoudparsian/data-algorithms-book

combinedByGeneRDD.mapValues((RankProduct value) -> {
double theRankedProduct = value.rank();
return new Tuple2<Double, Integer>(theRankedProduct, value.count);

代码示例来源:origin: apache/tinkerpop

@Override
public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
  if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
    LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
  if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
    throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
  SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));  // this might be bad cause it unpersists the job RDD
  // determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
  final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
  if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true))
    graphRDD.mapValues(vertex -> {
      vertex.get().dropEdges(Direction.BOTH);
      return vertex;
    }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
        // call action to eager store rdd
        .count();
  else
    graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
        // call action to eager store rdd
        .count();
  Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
}

代码示例来源:origin: apache/tinkerpop

public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
    final JavaPairRDD<Object, VertexWritable> graphRDD,
    final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
    final Set<VertexComputeKey> vertexComputeKeys) {
  // the graphRDD and the viewRDD must have the same partitioner
  if (graphRDD.partitioner().isPresent())
    assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
  final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
  return graphRDD.leftOuterJoin(viewIncomingRDD)
      .mapValues(tuple -> {
        final StarGraph.StarVertex vertex = tuple._1().get();
        vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
        // attach the final computed view to the cached graph
        final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
        for (final DetachedVertexProperty<Object> property : view) {
          if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
            property.attach(Attachable.Method.create(vertex));
        }
        return tuple._1();
      });
}

相关文章

微信公众号

最新文章

更多

JavaPairRDD类方法