我对pyspark有点陌生(更喜欢sparkscala),最近我遇到了下面的观察。当我使用parallelize()方法创建rdd时,返回类型是rdd类型。但是当我使用range()方法创建rdd时,它的类型是pipelinedrdd。例如:
>>> listRDD =sc.parallelize([1,2,3,4,5,6,7])
>>> print(listRDD.collect())
[1, 2, 3, 4, 5, 6, 7]
>>> print(type(listRDD))
<class 'pyspark.rdd.RDD'>
>>> rangeRDD =sc.range(1,8)
>>> print(rangeRDD.collect())
[1, 2, 3, 4, 5, 6, 7]
>>> print(type(rangeRDD))
<class 'pyspark.rdd.PipelinedRDD'>
我检查了这两个RDD是如何构造和找到的:
1) 在内部两者都只使用平行化方法。
>>> rangeRDD.toDebugString()
b'(8) PythonRDD[25] at collect at <stdin>:1 []\n | ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195 []'
>>> listRDD.toDebugString()
b'(8) PythonRDD[26] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[21] at parallelize at PythonRDD.scala:195 []'
2) pipelinerdd是我所了解的rdd类的一个子类。
但是,当它将是pipelineeddd类型时,以及当它将是rdd类型时,是否存在任何泛型逻辑?提前谢谢大家。
1条答案
按热度按时间ercv8c1e1#
sc.range
他确实在打电话parallelize
方法-在这里定义。你看得出来sc.range
正在呼叫sc.parallelize
以xrange作为输入。以及sc.parallelize
使用xrange输入类型调用时有一个单独的代码分支:它使用空列表作为参数调用自己,然后应用mapPartitionsWithIndex
这里是sc.parallelize
呼叫和sc.range
依次打电话。所以您可以看到第一个常规对象是如何创建的,就像您使用sc.parallelize
(虽然对象是一个空列表),但最终输出是在其上应用Map函数的结果。这种行为的主要原因似乎是为了避免具体化否则会发生的数据(如果输入没有实现len,它会被迭代并立即转换为list)。