Spark之性能优化

x33g5p2x  于2020-11-02 转载在 Spark  
字(15.4k)|赞(0)|评价(0)|浏览(448)

1、诊断Spark****程序内存的消耗

A**、Spark程序内存都花费在哪里?**

Ø每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上占用的内存比对象自己还要大;

ØJava的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列,并且还得保存诸如数组长度之类的信息,而且由于String使用的是UTF-16编码,因此每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节;

ØJava中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节;

Ø元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。

B**、如何判断Spark程序消耗的内存?**

Ø首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task/partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量;

Ø其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可;

Ø接着,观察Driver的log,会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息,这就显示了每个partition占用了多少内存;

Ø最后,将这个内存信息乘以partition数量,即可得出RDD的内存占用量。

2、使用高性能序列化类库

A**、Java序列化机制&**Kryo序列化机制

默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的,而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制有两个缺陷,其一是速度比较慢,其二是序列化后的数据占用的内存空间比较大;

Spark同样支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是Spark默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它不一定能够进行Kryo序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。

B**、如何使用Kryo序列化机制**

如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)即可,将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了;

使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。举例如下:

val counter = new Counter();

val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))

numbers.foreach(num => counter.add(num));

(实际上,上面的写法是错误的,因为counter不是共享的,所以累加的功能是无法实现的)

如果要注册自定义的类型,那么就需要使用如下的代码:

Scala版本:

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.registerKryoClasses(Array(classOf[Counter]))

val sc = new SparkContext(conf)

Java版本:

SparkConf conf = new SparkConf().setMaster(...).setAppName(...);

conf.registerKryoClasses(Counter.class);

JavaSparkContext sc = new JavaSparkContext(conf);

C、优化****Kryo类库使用

优化****缓存大小

如果注册的要序列化的自定义类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大,此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大,比如设置为10。

预先****注册自定义类型

虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存,因此通常都建议预先注册好要序列化的自定义类。

D、什么场景适合使用Kryo序列化类库

这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说,在外部定义了一个封装了应用所有配置信息的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据,然后,在算子函数里面,使用到了这个外部的大对象。此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度非常缓慢,并且序列化以后的数据还是很大,比较占用内存空间。

因此,在这种情况下,比较适合切换到Kryo序列化类库,来对算子外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。

**3、**优化数据结构

要减少内存的消耗,除了使用高校的序列化类库以外,还有一个很重要的事情,就是优化数据结构,从而避免Java语法特性中所导致的额外内存开销,比如基于指针的Java数据结构以及包装类型。有一个关键问题,就是优化什么数据结构?其实主要就是优化你的算子函数内部使用到的局部数据,或者是算子函数外部的数据,都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗的占用。

那么,如何优化数据结构呢?

优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。比如,有个List list = new ArrayList(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存得多;还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>(),可以优化为特殊的字符串格式,如:id:name,address|id:name,address...。

避免使用多层嵌套的对象结构,比如:public class Teacher { private List<Student> students = new ArrayList<Student>() },就是非常不好的例子,因为Teacher类的内部又嵌套了大量的小Student对象。对于上述例子,完全可以使用特殊的字符串来进行数据的存储,比如,用json字符串来存储数据,就是一个很好的选择:{"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]}

例如mapreduce程序:

context.write(new Text(key),new person());

替换为:

Person p1 = new Person(1,"zahngsan");

String str = JSON.toJSON(p1);

context.write(new text(key),str);

对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效一些,占用内存量少很多,但是之前分析过,还是有额外内存的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。这里提醒下,在spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。(uuid如/////-234242342-/////)。

4、对多次操作的RDD进行持久化或CheckPoint

如果在程序中,对某一个RDD,基于它进行了多次transformation或action操作,那么就非常有必要进行持久化操作,以避免对一个RDD反复进行计算。此外,如果要保证RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行CheckPoint操作。整个过程及详细说明,如下图所示:

除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能,因为很可能,RDD的数据是持久化到内存或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等,使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比较小。

对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组,这样,对于内存的消耗就小得多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列,会增大其性能开销。因此,对于序列化的持久化级别,还可以进一步优化,就是说,使用Kryo序列化类库,这样可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(如RDD<T>的泛型类型),是自定义类型的话,在Kryo中需要提前注册该自定义类型。

5、措施四:优化Java虚拟机****垃圾回收

如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的Java对象,并且在垃圾回收时,找到那些已经不再使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。垃圾回收的性能开销,是跟内存中的对象数量成正比的,所以,对于垃圾回收的性能问题,首先要做的就是使用更高效的数据结构,比如Array和String;其次就是在持久化RDD时,使用序列化级别,而且用Kryo序列化类库,这样每个Partition就只是一个对象——一个字节数组。

在优化Java虚拟机垃圾回收之前,可以对垃圾回收进行检测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,即--conf spark.executor.extraJavaOptions=-verbose:gc –XX:+PrintGCDetails –XX:+PrintGCTimerStamps。但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker节点上的日志中,而不是driver的日志中;其实也完全可以通过SparkUI(4044端口)来观察每个stage的垃圾回收的情况。

lgc本身有性能消耗,如果频繁发生,对性能的影响不能忽视;

l数据量过大,每次gc回收的数据量也会很大,gc的速度会比较慢;

lgc也是由一个线程来执行,而gc线程会阻塞工作线程,严重影响Spark性能。

A**、优化executor内存比例**

对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。在这种情况下,很有可能因为内存空间的不足,task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此,在极端情况下,垃圾回收操作可能会被频繁触发。

在上述情况下,如果发现垃圾回收频繁发生,就需要对那个比例进行调优,使用new SparkConf().set(“spark.storage.memoryFraction”,”0.5”)即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗,给task提供更多的内存,从而避免task的执行频繁触发垃圾回收。

B**、高级垃圾回收****调优**

Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代存放的是短时间存活的对象,老年代存放的是长时间存活的对象。年轻代又被划分为了三块空间,Eden、Survivor1、Survivor2。首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。

Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能待在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入老年代,从而造成短时间存活的对象,长期待在老年代中占据空间,而且Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:

n降低spark.storage.memoryFraction的比例,给年轻代更多的内存空间,来存放短时间存活的对象;

n给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域分配预计大小的4/3;

n如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 /* 3 /* 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 /* 3 /* 64 /* 4/3。

根据经验来看,对于垃圾回收的调优,尽量就是调节executor内存的比例就可以了。因为jvm调优是非常复杂和敏感的,除非真的到了万不得已的时候,而且开发者又对jvm相关的技术比较了解,那么此时可以进行eden区域的调节和调优。对应的2个高级参数说明如下:

**-XX:SurvivorRatio=4:**如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代的比例是1/6,所以,可以尝试调大Survivor区域的大小;

**-XX:NewRatio=4:**新生代(eden+2/*s)和老年代(不包含永久区)的比值,4表示新生代:老年代=1:4,即年轻代占堆的1/5。

6、提高Spark并行度

实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,才能充分提高Spark应用程序的性能。Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。

可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set(“spark.default.parallelism”, “60”)来设置合理的并行度,从而充分利用集群资源。

**7、**广播共享数据

如果算子函数中使用到了特别大的数据,那么这个时候推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task中,而是给每个节点拷贝一份该大数据,然后节点上的task共享该数据。如此,就可以减少大数据在节点上的内存消耗,并且可以减少数据到节点的网络传输开销。广播共享数据的原理,如下图所示:

**8、**数据本地化

数据本地化对于Spark Job的性能有着巨大的影响,如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。

数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有如下几种数据本地化级别:

nPROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中;

nNODE_LOCAL:数据和计算它的代码在同一个节点上,但是不在同一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中;

nNO_PREF:数据从哪里过来,性能都是一样的;

nRACK_LOCAL:数据和计算它的代码在同一个机架上;

nANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。

Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。可以设置spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间,最好是采用分层设置的方式,对于最好的本地化级别,可以设置等待的时间长些,毕竟本地化级别越好数据传输的效率也越高。spark.locality.wait(默认3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。数据本地化的原理,如下图所示:

9、用reduceByKey替代groupByKey

对于单词计数WordCount程序来说,同样可以使用groupByKey算子来实现对应的功能,但是它的效率会远远低于reduceByKey,程序代码参考如下:

//reduceByKey代码:

val counts = pairs.reduceByKey(_ + _)

//groupByKey代码:

val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。只有在reduceByKey处理不了时,才用groupByKey().map()来替代。groupByKey算子的原理,如下图所示:

reduceByKey算子的原理,如下图所示:

10、shuffle****性能优化

大多数Spark作业的性能主要消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此务必把握住调优的基本原则,千万不要舍本逐末。下面详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

A**、经优化的****HashShuffleManager**

下图说明了未经优化的HashShuffleManager的原理,这里先明确一个假设前提:每个executor只有1个cpu core,也就是说,无论这个executor上分配多少个task线程,同一时间都只能执行一个task线程。

从shuffle write说起,shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个executor,每个executor执行5个Task,那么每个executor上总共就要创建500个磁盘文件,所有executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着来说说shuffle read,shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据拉取完,并得到最终的结果。

B**、优化后的HashShuffleManager**

下图说明了优化后的HashShuffleManager的原理,这里说的优化,是指可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个executor上有多少个cpu core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件中。

当executor的cpu core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效地将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个executor,每个executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个executor会产生500个磁盘文件,所有executor会产生5000个磁盘文件。但是此时经过优化之后,每个executor创建的磁盘文件的数量的计算公式为:cpu core的数量 /* 下一个stage的task数量。也就是说,每个executor此时只会创建100个磁盘文件,所有executor只会创建1000个磁盘文件。

C**、SortShuffleManager运行原理**

下图说明了普通的SortShuffleManager的原理,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此,此时每个executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

D、bypassSortShuffleManager运行机制

下图说明了bypass SortShuffleManager的原理,bypass运行机制的触发条件如下:

lshuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值;

l不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

E**、shuffle相关参数****调优**

以下是Shuffle过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

默认值:32k

参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲区大小。将数据写到磁盘文件之前,会先写入buffer缓冲区中,待缓冲区写满之后,才会溢写到磁盘。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置shuffle read task的buffer缓冲区大小,而这个buffer缓冲区决定了每次能够拉取多少数据。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

默认值:3

参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数代表了executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read task的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

相关文章

微信公众号

最新文章

更多