Apache Spark 将RDD初始化为空

bcs8qyzn  于 8个月前  发布在  Apache
关注(0)|答案(7)|浏览(75)

我有一个RDD叫

JavaPairRDD<String, List<String>> existingRDD;

现在我需要将existingRDD初始化为空,这样当我得到实际的RDD时,我就可以与existingRDD进行联合。除了将existingRDD初始化为null之外,我如何将其初始化为空RDD?下面是我的代码:

JavaPairRDD<String, List<String>> existingRDD;
if(ai.get()%10==0)
{
    existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/",
    NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten                                  
}
else
{
    existingRDD.union(rdd);
}
7bsow1i6

7bsow1i61#

要在Java中创建一个空RDD,您只需执行以下操作:

// Get an RDD that has no partitions or elements.
JavaSparkContext jsc;
...
JavaRDD<T> emptyRDD = jsc.emptyRDD();

我相信你知道如何使用泛型,否则,对于你的情况,你需要:

JavaRDD<Tuple2<String,List<String>>> emptyRDD = jsc.emptyRDD();
JavaPairRDD<String,List<String>> emptyPairRDD = JavaPairRDD.fromJavaRDD(
  existingRDD
);

您也可以使用mapToPair方法将JavaRDD转换为JavaPairRDD
Scala中:

val sc: SparkContext = ???
... 
val emptyRDD = sc.emptyRDD
// emptyRDD: org.apache.spark.rdd.EmptyRDD[Nothing] = EmptyRDD[1] at ...
dgsult0t

dgsult0t2#

val emptyRdd=sc.emptyRDD[String]

上面的语句将创建空的RDD,类型为String
SparkContext类:
获取没有分区或元素的RDD

def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T] (this)
vshtjzan

vshtjzan3#

在scala中,我使用“parallelize”命令。

val emptyRDD = sc.parallelize(Seq(""))
rvpgvaaj

rvpgvaaj4#

@eliasah的回答非常有用,我提供了创建空对RDD的代码。考虑一个需要创建空对RDD(key,value)的场景。下面的scala代码演示了如何创建空对RDD,key为String,value为Int。

type pairRDD = (String,Int)
var resultRDD = sparkContext.emptyRDD[pairRDD]

RDD将按如下方式创建:

resultRDD: org.apache.spark.rdd.EmptyRDD[(String, Int)] = EmptyRDD[0] at emptyRDD at <console>:29
dfddblmv

dfddblmv5#

在Java中,创建空RDD有点复杂。我尝试使用scala.reflect.classTag,但它也不起作用。经过多次测试,工作的代码更加简单。

private JavaRDD<Foo> getEmptyJavaRdd() {

/* this code does not compile because require <T> as parameter into emptyRDD */
//        JavaRDD<Foo> emptyRDD = sparkContext.emptyRDD();
//        return emptyRDD;

/* this should be the solution that try to emulate the scala <T> */
/* but i could not make it work too */
//        ClassTag<Foo> tag = scala.reflect.ClassTag$.MODULE$.apply(Foo.class);
//        return sparkContext.emptyRDD(tag);

/* this alternative worked into java 8 */
    return SparkContext.parallelize(
            java.util.Arrays.asList()
    );

}
vfh0ocws

vfh0ocws6#

在Java中,创建空对RDD如下:

JavaPairRDD<T, T> emptyPairRDD = JavaPairRDD.fromJavaRDD(SparkContext.emptyRDD());
vsnjm48y

vsnjm48y7#

你可以试试下面的代码片段:

val emptyRDD = spark.emptyDataset[T].rdd

相关问题