spark sql任务性能优化(基础)

x33g5p2x  于2021-11-22 转载在 Spark  
字(3.9k)|赞(0)|评价(0)|浏览(723)

优化任务的意义

  • 对于项目而言,可以节省机器计算资源,资源就是时间就是钱
  • 执行时间可能大幅度缩短,对于长链条任务依赖减少等待时间,尤其于上游任务而言,从而数据稳定性增加
  • 执行高频次的数据任务保证其及时性

spark UI

在上面的导航栏:

  • 1 代表job页面,在里面可以看到当前应用分析出来的所有任务,以及所有的excutors中action的执行时间。
  • 2 代表stage页面,在里面可以看到应用的所有stage,stage是按照宽依赖来区分的,因此粒度上要比job更细一些
  • 3 代表storage页面,我们所做的cache persist等操作,都会在这里看到,可以看出来应用目前使用了多少缓存
  • 4 代表environment页面,里面展示了当前spark所依赖的环境,比如jdk,lib等等
  • 5 代表executors页面,这里可以看到执行者申请使用的内存以及shuffle中input和output等数据
  • 6 这是应用的名字,代码中如果使用setAppName,就会显示在这里
  • 7 是job的主页面。

job页面

这里是任务执行Job的信息,一个算子会出发一个job,比如count或者insert into等

**在spark中rdd的计算分为两类,一类是transform转换操作,一类是action操作,**只有action操作才会触发真正的rdd计算。具体的有哪些action可以触发计算,

还包括提交时间,执行时间,每个job得到stage数量以及成功的stage数量,每个job所有的task数量和成功的task数量

找出执行最长的job
  • job页面的duration是支持排序的,通过排序我们能够知道,那个作业执行最久,方便后续从stage和task方面做进一步的分析

  • 注意job之间的间隔,定位是否有文件数目不合理问题或者集群rpc是否有问题

  • 如果观察到spark job本身并没有特别慢的作业,那么需要关注一下作业之间的间隔。间隔指的是 上一个job的完成时间和下一个job的开始时间的时间差
    这也是引起作业性能瓶颈的原因。这时候可以通过driver的日志做进一步的定位。(文件数目过多,集群压力稍大的情况下文件写入会很耗时。)

stage列表 页面

在Spark中job是根据action操作来区分的,另外任务还有一个级别是stage,它是根据宽窄依赖来区分的。

展示RDD的依赖图,通过sql可以找到对应的RDD逻辑,主要参照Exchange(产生shuffle),ps:gby、join语句都会产生exchange。

每两次需要shuffle 操作之间是一个stage,如join,group by

在这里可以看到spark 任务最重转换成rdd的stage信息,已完成的stage,正在执行的stage,跳过的stage

可以查看每个stage的task数量和`成功的task数量``

  • Input从hadoop或者spark storage读取的数据大小
  • Output写入hadoop的数据大小
  • shuffle read每个stage读取的数据大小
  • shuffle write每个stage写入磁盘的数据大小供于未来某个stage读取。

一般而言task数量(partition)等于spark.sql.shuffle.partitions数量,当spark从hive,kudu读取数据时,task数量和数据表的分区数保持一致。

task列表 页面

  • 关注数据分布

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6l7UTBGL-1637583901499)(D:\Users\lawrence.w\AppData\Roaming\Typora\typora-user-images\image-20211105142221669.png)]

上图说明了stage的所有task的数据分布,能有效的帮助我们发现数据倾斜(时间大,数据量多)

excutors页面

这个页面比较常用了,一方面通过它可以看出来每个excutor是否发生了数据倾斜,另一方面可以具体分析目前的应用是否产生了大量的shuffle,是否可以通过数据的本地性或者减小数据的传输来减少shuffle的数据量。

查看SQL的执行计划

scan:从hive,kudu读取数据
Filter:过滤操作,包括去除缺失值,条件筛选等
Project:映射,选择需要的列
HashAggregate:聚合
wholestagecodegen:全阶段代码生成,用来将多个处理逻辑整合到单个代码模块中,是Spark的新的SQL代码生成模型
Exchange:数据重分区
Sort:数据根据某个key排序
SortMergeJoin:大表和大表join的策略
shuffle阶段:将两张大表根据join key进行重新分区,对应执行计划中的Exchange
sort阶段:对单个分区节点的两表数据,分别进行排序,对应执行计划中的Sort
merge阶段:对排好序的两张分区表数据执行join操作。

![img](https://upload-images.jianshu.io/upload_images/22206660-25788f9274185047.png

调优方法(常见)

性能调优其实也比较符合二八定律,常见20%的优化方法基本可满足80%需优化任务的效率。运行性能与开发效率以及可读性逻辑在日常工作中也需要考虑从中取得一个平衡。

读表慢 I/0

  • 文件格式的问题:不可切割存储格式 改为 orc (线上默认存储格式)

parquet和orc这类的存储格式实现了按列进行读写,大部分的情况下,我们其实不会需要把全部字段给查出来,按列式存储可以减少每次读取的数据量,另一方面列式存储在减少读取方面还做了一些文件下推操作的优化,可以按照文件读取的范围进行筛选。

  • 扫描量过大—单分区为全量表未提前选定分区

图示为 错把分区当业务过滤

  • 避免多次读同一大表,基本上资源都用作了读大表的IO上

解决方案:一次读用case when 条件判断进行改写,where (条件1=A or 条件2 = B)

cacheTable(’’)再进行多次读是否可行? 若是cache大表本身也需要大量时间,还得考虑内存参数是否足够cache。

另一种常出现的案例是根据 同一个大表根据 不同group by 字段 再进行union all——不同维度去合并数据指标

解决方案:可以用窗口函数 groupping_set() 避免多次读,复用groupby操作的shuffle数据,同时也是减少shuffle操作

尽可能地filter

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。

常见于A表 join B表 A表进行过滤,而B表没有filter

减少落地操作

内存计算的时候频繁的落地显得会比较耗时,可以通过使用中间的临时视图进行中转结果,当然这种场景限于不是计算量很大的中间结果。

create table temp.t1 as select ....;

计算的复用

计算的复用是通过执行策略进行操作的,Spark比较大的操作其实就是shuffle本身,spark对表的bucket存储可以把表的分桶信息进行物化,使用表的时候使用相同的bucket shuffle操作的时候可以复用这一次shuffle操作,不再需要进行shuffle的动作了,这块可以加速join 、group by、 over()这些操作是生产实践比较多的操作

合理利用缓存

在Spark的计算中,不太建议直接使用cache,万一cache的量很大,可能导致内存溢出。可以采用persist的方式,指定缓存的级别为MEMORY_AND_DISK,这样在内存不够的时候,可以把数据缓存到磁盘上。另外,要合理的设计代码,恰当地使用广播和缓存,广播的数据量太大会对传输带来压力,缓存过多未及时释放,也会导致内存占用。一般来说,你的代码在需要重复使用某一个rdd的时候,才需要考虑进行缓存,并且在不使用的时候,要及时unpersist释放。

数据倾斜

数据倾斜是常见的导致Spark SQL性能变差的问题。数据倾斜是指某一个partition的数据量远远大于其它partition的数据,导致个别任务的运行时间远远大于其它任务,因此拖累了整个SQL的运行时间。

  • 在源端过滤无用数据

  • 手工操作

  • 打散倾斜key,进行两端聚合(针对聚合)或者拆分倾斜key进行打散然后union

  • 对特殊key处理:空值映射为特定Key,然后分发到不同节点,对空值不做处理。

  • 广播较小的表(默认大小),调大BroadcastHashJoin的阈值,在某些场景下可以把SortMergeJoin转化成BroadcastHashJoin而避免shuffle产生的数据倾斜

磁盘溢写严重(shuffle阶段)

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

  • 尽量避免shuffle操作(好像在说废话)
  • 适当调高内存参数-executor-memory
  • 数坊参数追加spark.sql.shuffle.partitions=200; 在SQL 里面追加参数 set spark.sql.shuffle.partitions=200;

待续

相关文章