Spark之SparkSQL

x33g5p2x  于2020-10-30 发布在 SparkSQL  
字(16.4k)|赞(0)|评价(0)|浏览(874)

1**、更多SparkSQL数据源**

除了使用Parquet文件和JSON文件作为SparkSQL的数据源,SparkSQL还支持通过JDBC读取其他数据库表中的数据作为数据源,以及使用Hive中的Table表数据作为数据源。下面分别针对这两种SparkSQL数据源进行介绍。

A**、使用JDBC数据源**

如何在SparkSQL中使用JDBC数据源呢?通过JDBC直接读取关系型数据库表中的数据,加载成一张表(DataFrame),然后便可以使用SparkSQL进行分析和处理。特别要注意的是,在SparkSQL中使用JDBC数据源,需要将相应的JDBC驱动jar包拷贝到对应目录下,下面以Oracle 10g数据库版本为例进行介绍。

在Oracle的安装目录oracle\product\10.2.0\db_1\jdbc\lib下,有很多驱动jar包,拷贝ojdbc14.jar上传到主机hadoop221的/root/temp目录下。进入到/root/training/spark-2.1.0-bin-hadoop2.7目录,运行命令bin/spark-shell --master spark://192.168.12.221:7077 --jars /root/temp/ojdbc14.jar --driver-class-path /root/temp/ojdbc14.jar,其中--jars参数用于指定Oracle数据库驱动jar包,--driver-class-path参数用于指定Oracle数据库驱动类的路径。

运行命令读取Oracle数据库表中的数据,各个参数的详细解释如下图所示:

这里简单介绍两种读取Oracle数据库表中数据的方式:

(1)直接使用option参数的方式:

val oracleDataDF = spark.read.format("jdbc").option("driver", "oracle.jdbc.OracleDriver").option("url", "jdbc:oracle:thin:@192.168.12.142:1521/orcl").option("dbtable", "scott.emp").option("user", "scott").option("password", "tiger").load

注意:由于使用的Oracle数据库版本是10g,故这里必须要指定driver参数和url参数,而如果使用11g版本,则该命令有所不同:

val oracleDataDF =spark.read.format("jdbc").option("url","jdbc:oracle:thin:@192.168.12.142:1521/orcl.example.com").option("dbtable","scott.emp").option("user","scott").option("password","tiger").load

然后运行命令oracleDataDF.show,查看从Oracle数据库中获取的数据内容,运行结果如下图所示:

(2)使用Properties属性类的方式:

首先,导入相应的类:import java.util.Properties

然后,设置需要使用到的属性:

val oracleProperties = new Properties() //定义属性

oracleProperties.setProperty("user","scott") //设置用户名

oracleProperties.setProperty("password","tiger") //设置密码

再运行命令val oracleDataDF1 = spark.read.jdbc("jdbc:oracle:thin:@192.168.12.142:1521/orcl", "scott.emp", oracleProperties),从Oracle数据库表中获取数据,最后运行命令oracleDataDF1.show,查看从Oracle数据库表中获取的数据内容,运行结果如下图所示:

B**、使用Hive Table数据源**

在SparkSQL中使用Hive Table数据源,需要将SparkSQL和Hive进行集成,把Hive作为SparkSQL的数据源,类似于将Oracle数据库表作为SparkSQL的数据源。SparkSQL和Hive的集成,操作步骤如下:

(1)需要搭建好Hive的环境(这个的前提需要搭建Hadoop环境);

(2)配置SparkSQL以支持Hive,需要将Hive的配置文件和Hadoop的配置文件拷贝到$SPARK_HOME/conf目录下,需要拷贝的文件列表如下:

$HIVE_HOME/conf/hive-site.xml
*
$HADOOP_CONF_DIR/core-site.xml
*
$HADOOP_CONF_DIR/hdfs-site.xml

进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,分别运行命令:

cp ~/training/hadoop-2.7.3/etc/hadoop/core-site.xmlconf

cp ~/training/hadoop-2.7.3/etc/hadoop/hdfs-site.xmlconf

cp ~/training/apache-hive-2.3.0-bin/conf/hive-site.xmlconf/

(3)在启动spark-shell的时候,加入MySQL的驱动jar包,运行命令sbin/start-all.sh,重新启动Spark,运行命令bin/spark-shell --master spark://hadoop221:7077 --jars /root/training/apache-hive-2.3.0-bin/lib/mysql-connector-java-5.1.46.jar,重新启动spark-shell。

(4)使用spark-shell操作Hive,准备测试数据,进入目录/root/data,运行命令vi data.txt创建data.txt文件,往其中添加如下内容:

1,Special

2,Iris

3,Sherry

4,Cherry

5,Marry

6,Chris

保存退出。运行命令spark.sql("create table name(id INT, name STRING) row format delimited fields terminated by ','"),在Hive中创建对应的人名表name,然后往新创建的表name中导入数据,运行命令spark.sql("load data local inpath '/root/data/data.txt' into table name"),将测试数据文件data.txt中的内容导入到name表中,最后运行命令spark.sql("select /* from name").show,查询表name中的所有数据,运行结果如下图所示:

2**、SparkSQL编程实践,将结果保存到Oracle**

在生产实践中,大多数时候,需要编写一个完整的SparkSQL程序,然后导出为jar包,并提交到Spark上运行,而不是像前面那样直接登录到spark-shell客户端运行程序,进行数据的处理和分析。从本质上来说,这两种方式,其核心代码实现的功能是一致的,只不过完整的SparkSQL程序做了更多的额外处理,程序更完整、更健壮。

上一篇文章中讲过一个SparkSQL编程实例,实现的功能是从C:\MyDocuments\TestData\students.txt文件中读取数据,然后进行分析处理,最后将查询的结果显示到控制台。下面重新实现该SparkSQL程序,只不过这里将最终查询的结果保存到Oracle数据库表中。并且,这里简单介绍下该SparkSQL程序的两种运行方式,其一是本地运行(直接在Scala IDE中运行);其二是提交任务到Spark集群上运行。

本地运行

程序代码如下图所示,注意本地运行需要设置运行模式master为local。

程序执行完后,到Oracle的sqlplus命令行进行查看,结果如下图所示。事先未在Oracle数据库中创建相应的mystudent表,而是由SparkSQL程序自动创建,这个从下图中也可以看出来。同时,由于mystudent表是SparkSQL程序自动创建,表的格式会存在一些问题,导致显示不太友好,通过设置列宽(col stuName for a20,表示将列stuName设置为最多显示20个字符)可以进行调整,使得最终查询结果的显示更完美。

集群模式

将SparkSQL程序提交到Spark集群中运行,首先,需要在程序中做些调整,将设置运行模式master为local的代码去掉;由于此时程序不是运行在Windows系统中,需要将设置系统参数hadoop.home.dir的代码也去掉;数据文件students.txt的路径也需要做调整,此时直接在命令行中使用参数传递数据文件的全路径。

程序代码如下图所示:

编写完程序后,导出为SparkOracle.jar包,然后上传到主机hadoop221的/root/temp目录下,进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,最后运行命令bin/spark-submit --master spark://hadoop221:7077 --class day0615.SparkSQLForOracleDemo /root/temp/SparkOracle.jar hdfs://hadoop221:9000/data/students.txt,等待程序执行完成即可。输出的log如下图所示,程序的运行结果跟前面一致,这里不再给出。

3**、高性能Spark调优**

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。Spark作业的性能优化主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分:

开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;
*
数据倾斜调优,主要是各种解决Spark作业数据倾斜的技术解决方案;
*
shuffle调优,主要是对Spark作业的shuffle运行过程以及细节进行调优,这需要对Spark的原理有较深层次的研究和掌握。

****这里主要讲解开发调优和资源调优。

A**、开发调优**

原则一:避免创建重复的RDD

通常来说,在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD,以此类推,循环往复,直到计算出最终需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

举例:

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用同一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的,那么此时可以只使用key-value类型的那个RDD,因为它已经包含了另一个RDD的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

举例:

原则三:对多次使用的RDD进行持久化

当在Spark代码中多次对同一个RDD做了算子操作后,很棒,已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。Spark中对于同一个RDD执行多次算子的默认原理是这样的:每次对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行该算子操作,很显然,这种方式的性能是很差的。

因此对于这种情况,给出的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中,以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

举例:

对于persist()方法而言,可以根据不同的业务场景选择不同的持久化级别,这样更具有灵活性。Spark的持久化级别如下图所示:

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子,因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其它节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的罪魁祸首。

所以在平时的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。如下示例使用Broadcast与map进行join操作。

举例:

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其它节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

如下两幅图就是一个比较典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其它的算子也都有着相应的优化原则:

使用reduceByKey/aggregateByKey替代groupByKey,这在原则五中已经进行了详细的阐述,可以参考;
*
使用mapPartitions替代普通map;

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重。

使用foreachPartitions替代foreach;

这个的原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下的;但是此时如果使用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

使用filter之后进行coalesce操作;

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作;

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子。官方建议,如果是需要在repartition重分区之后还要进行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是更高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

举例:

原则八:使用Kryo优化序列化功能

在Spark中,主要有三个地方涉及到了序列化:

在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输,这在原则七中进行了相关介绍,可参考;
*
将自定义的类型作为RDD的泛型类型时(比如JavaRDD、Student是自定义类型),所有自定义类型对象,都会进行序列化,因此这种情况下,也要求自定义的类必须实现Serializable接口;
*
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStreamAPI来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。

官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

下面是使用Kryo的代码示例,只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等)。

举例:

原则九:优化数据结构

Java中,有三种数据类型比较耗费内存:

对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间;
*
字符串,每个字符串内部都有一个字符数组以及长度等额外信息;
*
集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样可以尽可能地减少内存占用,从而降低GC频率,提升性能。

但在实践中,往往会发现,要做到该原则其实并不容易,因为同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。

同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于开发者的编码难度以及代码的可维护性,也是一个极大的挑战。因此建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

B**、资源调优**

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。

总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

Spark****作业运行原理

Spark作业的运行原理,如下图所示:

使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,如美团-大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。

YARN集群管理器会根据用户为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行编写的作业代码了。Driver进程会将编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。

task是最小的计算单元,负责执行一模一样的计算逻辑(也就是编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入中间计算结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。

如此循环往复,直到将编写的代码逻辑全部执行完,并且计算完所有的数据,得到最终想要的结果为止。Spark是根据shuffle类算子来进行stage的划分。如果代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。

因此,一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数),这个过程就是shuffle。当在代码中执行了cache/persist等持久化操作时,根据选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

故Executor的内存主要分为三块:

第一块是让task执行编写的代码时使用,默认是占Executor总内存的20%;
*
第二块是让task通过shuffle过程拉取上一个stage的task的输出结果后,进行聚合等操作时使用,默认也是占Executor总内存的20%;
*
第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关联的。一个CPU core同一时间只能执行一个线程,而每个Executor进程上分配到的多个task,都是以每个task一个线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是对Spark作业的基本运行原理的说明,可以结合原理图来进行理解。充分理解Spark作业的基本运行原理,是进行资源参数调优的基本前提。

资源参数调优

了解完Spark作业运行的基本原理后,对资源相关的参数就比较容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数来优化资源使用的效率,从而提升Spark作业的执行性能。以下就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,同时也给出了一个调优的参考值:

num-executors

**参数说明:**该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照设置在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常重要,如果不设置的话,默认只会启动少量的Executor进程,此时Spark作业的运行速度是非常慢的。

**参数调优建议:**每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置得太多的话,大部分队列可能无法给予充分的资源。

executor-memory

**参数说明:**该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常也有直接的关联。

**参数调优建议:**每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别人的作业无法运行。

executor-cores

**参数说明:**该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

**参数调优建议:*Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPUcore限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors / executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他人作业的运行。

driver-memory

**参数说明:**该参数用于设置Driver进程的内存。

**参数调优建议:**Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

**参数说明:**该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响Spark作业性能。

**参数调优建议:*Spark作业的默认task数量为500~1000个较为合适。很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致前面设置好的Executor的参数都前功尽弃。试想一下,无论Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就白白浪费了资源。因此Spark官网建议的设置原则是,设置该参数为num-executors / executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

**参数说明:**该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

**参数调优建议:**如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

**参数说明:**该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

**参数调优建议:**如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要大家根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

资源参数参考实例

下图是一份spark-submit命令的示例,可供参考一下,并根据自己的实际情况进行调节:

根据实践经验来看,大部分Spark作业经过本文所讲的开发调优与资源调优之后,一般都能以较高的性能运行,足以满足实际的基本需求。但在不同的生产环境和项目背景下,还是有可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。

4**、SparkSQL性能优化**

A**、在内存中缓存数据**

SparkSQL性能调优主要是将数据存入内存中,后续操作直接从内存中获得数据以提高性能。通过spark.cacheTable("tableName")或者dataFrame.cache()缓存表数据,使用spark.uncacheTable("tableName")来从内存中去除table数据。

举例如下:

(1)运行命令bin/spark-shell--master spark://192.168.12.221:7077 --jars /root/temp/ojdbc14.jar--driver-class-path /root/temp/ojdbc14.jar,登录上spark-shell;

(2)从Oracle数据库中读取数据,生成DataFrame

val oracleDataDF =spark.read.format("jdbc").option("driver","oracle.jdbc.OracleDriver").option("url","jdbc:oracle:thin:@192.168.12.142:1521/orcl").option("dbtable","scott.emp").option("user","scott").option("password","tiger").load

(3)将DataFrame注册成表:oracleDataDF.registerTempTable("emp")

(4)将表进行缓存,并查询两次,通过WebConsole(http://192.168.12.221:8080)监控执行的时间:

spark.sqlContext.cacheTable("emp")

spark.sql("select /* from emp").show

spark.sql("select /* from emp").show

(5)清空缓存:

spark.sqlContext.cacheTable("emp");

spark.sqlContext.clearCache

从Web Console监控到两次查询所耗费的时间,如下图所示,很明显地看到,经过缓存表数据之后,第二次查询所耗费的时间大大减少。

B**、SparkSQL性能优化相关参数**

将数据缓存到内存中的相关参数优化:

spark.sql.inMemoryColumnarStorage.compressed,默认为true,SparkSQL将会基于统计信息自动地为每一列选择一种压缩编码方式;
*
park.sql.inMemoryColumnarStorage.batchSize,默认值10000,缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

其他性能相关的配置选项:

spark.sql.files.maxPartitionBytes,默认值128 MB,读取文件时单个分区可容纳的最大字节数;
*
spark.sql.files.openCostInBytes,默认值4M,打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好,这样的话,小文件分区将比大文件分区更快 (优先被调度);
*
spark.sql.autoBroadcastJoinThreshold,默认值10M,用于配置一个表在执行join操作时能够广播给所有worker节点的最大字节大小。通过将这个值设置为-1可以禁用广播。注意,当前数据统计仅支持已经运行了ANALYZE TABLE<tableName> COMPUTE STATISTICS noscan命令的HiveMetastore表;
*
spark.sql.shuffle.partitions,默认值200,用于配置join或聚合操作混洗(shuffle)数据时使用的分区数。

参考文献:

——《CSDN其他博文》

——《ItStar 走进大数据|高性能Spark你必须知道的调优原则及建议》

——《潭州大数据课程课件》

转自https://mp.weixin.qq.com/s/-b0UILqi5AJKJksu8_OYnw

相关文章

微信公众号

最新文章

更多