rdd和pipelinedrdd类型

ekqde3dh  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(820)

我对pyspark有点陌生(更喜欢sparkscala),最近我遇到了下面的观察。当我使用parallelize()方法创建rdd时,返回类型是rdd类型。但是当我使用range()方法创建rdd时,它的类型是pipelinedrdd。例如:

>>> listRDD =sc.parallelize([1,2,3,4,5,6,7])
>>> print(listRDD.collect())
[1, 2, 3, 4, 5, 6, 7]
>>> print(type(listRDD))
<class 'pyspark.rdd.RDD'>

>>> rangeRDD =sc.range(1,8)
>>> print(rangeRDD.collect())
[1, 2, 3, 4, 5, 6, 7]
>>> print(type(rangeRDD))
<class 'pyspark.rdd.PipelinedRDD'>

我检查了这两个RDD是如何构造和找到的:
1) 在内部两者都只使用平行化方法。

>>> rangeRDD.toDebugString()
b'(8) PythonRDD[25] at collect at <stdin>:1 []\n |  ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195 []'
>>> listRDD.toDebugString()
b'(8) PythonRDD[26] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[21] at parallelize at PythonRDD.scala:195 []'

2) pipelinerdd是我所了解的rdd类的一个子类。
但是,当它将是pipelineeddd类型时,以及当它将是rdd类型时,是否存在任何泛型逻辑?提前谢谢大家。

ercv8c1e

ercv8c1e1#

sc.range 他确实在打电话 parallelize 方法-在这里定义。你看得出来 sc.range 正在呼叫 sc.parallelize 以xrange作为输入。以及 sc.parallelize 使用xrange输入类型调用时有一个单独的代码分支:它使用空列表作为参数调用自己,然后应用 mapPartitionsWithIndex 这里是 sc.parallelize 呼叫和 sc.range 依次打电话。所以您可以看到第一个常规对象是如何创建的,就像您使用 sc.parallelize (虽然对象是一个空列表),但最终输出是在其上应用Map函数的结果。
这种行为的主要原因似乎是为了避免具体化否则会发生的数据(如果输入没有实现len,它会被迭代并立即转换为list)。

相关问题