限制spark上下文中记录的数量

d5vmydt9  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(443)

我想减少每个reducer的记录数,并保持结果变量a rdd 使用 takeSample 似乎是显而易见的选择,然而,它返回了一个 collection 而不是一个 SparkContext 对象。
我想到了这个方法:

rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])

然而,这种方法非常缓慢,效率不高。
有没有更聪明的方法来获取一个小样本并保持数据结构的完整性 rdd ?

7rtdyuoh

7rtdyuoh1#

如果您想要一个小的示例子集,那么就不能对数据进行任何额外的假设 take 结合 parallelize 可能是最佳解决方案:

sc.parallelize(rdd.take(n))

它将触及相对较少的分区(在最佳情况下只有一个分区),并且对于较小的n,网络通信量的成本应该可以忽略不计。
取样( randomSplit 或者 sample )将需要与相同的完整数据扫描 zipWithIndexfilter .
假设没有数据倾斜,您可以尝试这样的方法来解决:

from __future__ import division  # Python 2 only

def limitApprox(rdd, n, timeout):
    count = rdd.countApprox(timeout)
    if count <= n:
        return rdd
    else:
        rec_per_part = count // rdd.getNumPartitions()
        required_parts = n / rec_per_part if rec_per_part else 1
        return rdd.mapPartitionsWithIndex(
            lambda i, iter: iter if i < required_parts else []
        )

这仍将访问每个分区,但如果不需要,将尽量避免计算内容
如果存在大数据倾斜,则无法工作
如果分布是均匀的,则所需的时间可能远远超过所需时间,但n<<超过每个分区的平均记录数。
如果分布向高指数倾斜,则可能样本不足。
如果数据可以表示为 Row 你可以尝试另一个技巧:

rdd.toDF().limit(n).rdd

相关问题