什么会影响spark中被洗牌的数据量

mec1mxoz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(344)

例如,我在spark上执行一些查询,在spark的ui中,我可以看到一些查询有更多的shuffle,而这个shuffle似乎是本地读取和执行器之间读取的数据量。
但是我不明白一件事,例如下面这个查询从hdfs加载了7gb,但是suffle read+shuffled write超过了10gb。但我看到其他的查询也从hdfs加载7gb,洗牌是500kb。所以我不明白,你能帮个忙吗?洗牌的数据量与从hdfs读取的数据无关?

select 
  nation, o_year, sum(amount) as sum_profit
from 
  (
select 
  n_name as nation, year(o_orderdate) as o_year, 
  l_extendedprice * (1 - l_discount) -  ps_supplycost * l_quantity as amount
    from
      orders o join
      (select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost 
       from part p join
         (select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey, 
                 n_name, ps_supplycost 
          from partsupp ps join
            (select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey, 
                    l_orderkey, n_name 
             from
               (select s_suppkey, n_name 
                from nation n join supplier s on n.n_nationkey = s.s_nationkey
               ) s1 join lineitem l on s1.s_suppkey = l.l_suppkey
            ) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
         ) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
     ) l3 on o.o_orderkey = l3.l_orderkey
  )profit
group by nation, o_year
order by nation, o_year desc;
sqougxex

sqougxex1#

shuffle是spark重新分发数据的机制,以便在分区之间对数据进行不同的分组。这通常涉及跨执行器和机器复制数据。所以这里很清楚,无序数据实际上并不依赖于输入数据的数量。但是,这取决于您对输入数据执行的操作,这会导致数据在执行器(以及机器)之间移动。请通过http://spark.apache.org/docs/latest/programming-guide.html#shuffle-了解和理解为什么洗牌是一个代价高昂的过程。
查看您粘贴的查询,似乎您正在执行许多连接操作(尚未深入了解您正在执行的最终操作)。这无疑需要跨分区移动数据。这个问题可以通过重新访问查询并优化相同的查询来解决,或者以减少数据移动的方式操作或预处理输入数据(例如:将已连接的数据合并到同一分区中)。同样,这只是一个例子,您必须从您的用例中确定什么最适合您。

ykejflvf

ykejflvf2#

我强烈建议阅读我认为是关于解释mapreduce编程模型的论文。
基本上不,不是hdfs上的数据量(或任何源)决定了有多少数据被洗牌。我将试着用三个例子来解释:
例1。洗牌的数据量小于输入数据量:

val wordCounts = words.map((_, 1)).reduceByKey(_ + _)

在这里,我们计算每个分区中的字数(每个键),然后只对结果进行洗牌。一旦我们洗牌了子计数,我们就把它们加起来。所以我们洗牌的数据量,与计数的数量有关。所以在这种情况下,它与唯一单词的数量有关。
如果我们只有一个唯一的字,我们将洗牌比输入少得多的数据。事实上,线程的数量和线程的数量一样多(所以数量很少)。
假设每一个单词都是唯一的,那么我们会重新整理更多的数据(阅读文章了解详情)。所以在这个例子中,被洗牌的数据量与我们有多少个唯一的键(唯一的单词)有关。
例2。洗牌的数据量与输入数据量相同:

val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size)

在这里,我们把所有的单词组合在一起,然后计算有多少个单词。所以我们需要把所有的数据重新整理一下。
例3。洗牌的数据量大于输入数据量:

val silly = 
  words.map(word => 
    (word, generateReallyLongString()))
  .groupByKey()

在这里,我们的Map阶段将每个单词Map到一个非常长的随机字符串,然后我们将它们按单词分组。在这里,我们生成的数据比输入的数据多,并且将洗牌的数据比输入的数据多。

相关问题