Spark常用算子内部原理剖析

x33g5p2x  于2020-11-02 转载在 Spark  
字(4.4k)|赞(0)|评价(0)|浏览(1750)

1、union算子

使用 union算子时需要保证两个RDD中的数据类型相同,返回的RDD数据类型和被合并的RDD数据类型相同,而且不会进行去重操作,保存所有元素。如果想去重可以叠加使用distinct算子。同时Spark还提供更为简洁的使用union算子的API,使用++符号相当于union算子操作。union算子原理如下图所示:

实例程序代码如下图所示:

public class UnionTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("UnionTest").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> numbers1 = Arrays.asList(1,2,3,4,5);
        List<Integer> numbers2 = Arrays.asList(6,7,8,9,10);
        JavaRDD<Integer> numbersRDD1 = sc.parallelize(numbers1,2);
        JavaRDD<Integer> numbersRDD2 = sc.parallelize(numbers2,3);
        JavaRDD<Integer> unionRDD = numbersRDD1.union(numbersRDD2);
        JavaRDD<Integer> resultRDD = unionRDD.mapPartitionsWithIndex((id,iter) -> {
                System.out.println("partition id is:" + id + "\tvalue is:");
                while(iter.hasNext()){
                    int val = iter.next();
                    System.out.print(val + "\t");
                }
                System.out.println();
                return iter;
        },false);
        resultRDD.collect();
        sc.close();
    }
}

运行结果如下图所示,新RDD的partition数目为两个旧RDD的partition数目之和。

2、groupByKey

groupByKey算子作用在一个PairRDD或(k,v)RDD上,返回一个(k,Iterable<v>)。主要作用是将相同的所有键值对分组到一个集合序列当中,其顺序是不确定的。groupByKey在计算过程中会把所有的键值对集合都加载到内存中,若一个键对应的数据太多,则容易导致内存溢出。groupByKey算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示,对一个RDD执行groupByKey算子操作,不会改变其partition数目,但可以在groupByKey()算子中人为地指定分区数,用以改变算子执行的并行度。

3、reduceByKey

同groupByKey类似,却又有所不同。reduceByKey主要作用是聚合,而groupByKey主要作用是分组(算子中的function函数参数,用于对key对应的value值进行聚合操作)。reduceByKey算子的原理如下图所示:

程序实例代码如下图所示,对一个RDD执行reduceByKey算子操作,默认情况下不会改变其partition数目,可以在reduceByKey(func,partitionNumber)算子中人为地指定分区数,便可以改变结果RDD的分区数。

4、distinct算子

distinct算子用于返回一个源数据集去重之后的新数据集,即去重。distinct算子的原理如下图所示:

程序实例代码如下图所示,distinct算子有个numTasks参数,这个参数不仅仅跟分区数有关系(代表操作后的分区数),它还可以理解为一个数学概念中的“因子”。如果设置的numTasks能被数据集中元素数目整除,那么排序就按先无序地排因子,后无序排非因子的组合(即相当于局部无序);如果设置的numTasks不能被数据集中所有元素数目整除,那么排序会按照去重之前RDD排序的顺序返回。

运行结果如下图所示,从这个numTasks=5中和numTasks=10中仔细观察,可以确定这个“任务数”是将任务均分了,如3个任务数,那么一个任务集中元素个数为10个,从数据集中选择能被3整除的10个元素作为第一个数据集的结果,再选择能被3除之后余数为1的作为第二个数据集...以此类推,局部无序,而整体有序。

5、cogroup算子

cogroup算子将两个RDD进行合并,生成一个新RDD,cogroup算子的定义如下:

cogroup[W](other:RDD[(K, W)],numPartitions:Int): RDD[(K, (Iterable[V],Iterable[W]))]

对两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。

(K,(Iterable[V],Iterable[W]))

其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。cogroup算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示,可以看到,在cogroup算子中传入numPartitions参数,可以改变结果RDD的分区数。

6、intersection算子

该算子返回两个RDD的交集,并且去重,在其内部实现中,先使用map算子进行操作,将RDD中的数据转换为<key,null>形式,然后再对转换后的数据进行cogroup算子操作,最后筛选出两个子集合均非空的元素,并使用map算子进行操作还原为初始数据形式intersection算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示,intersection算子只有一个参数,即other(RDD),没有第二个分区数参数,最终结果RDD的分区数为两个进行操作RDD的分区数较多的一个RDD的分区数。

7、join算子

Spark中的join算子,跟SQL语言中的inner join操作很相似,join算子的返回结果是前一个RDD和后一个RDD中key相同的数据组合成的一个新RDD。join算子对两个需要连接的RDD进行内连接操作,然后对每个key下的元素进行笛卡尔积操作,返回的结果再使用flatMap算子进行操作展平。join算子的原理如下图所示:

程序实例代码如下所示:

运行结果如下图所示:

8、sortByKey算子

sortByKey算子作用于Key-Value类型的RDD,并对key进行排序,它主要接收两个参数,第一个参数是boolean类型的,true表示升序,false表示降序;第二个参数是Int类型的,表示分区数。该算子返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行sortByKey排序操作,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。这个算子的实现很优雅,内部使用到了RangePartitioner,它可以使得相应范围Key对应的数据分到同一个partition中,然后内部再使用mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。sortByKey算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示:

9、cartesian算子

笛卡尔积实现对两个RDD中所有元素的直接拼接,操作后,内部实现返回CartesianRDD。由于笛卡尔积的连接结果与连接顺序无关,即:没有驱动RDD和从动RDD的区别,因而它们的位置只会影响结果集中每一行记录的左右顺序,不影响整个结果的最终意义。cartesian算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示:

10、coalesce算子repartition算子

这两个算子都是对RDD的分区进行重新划分,repartition只是coalesce算子中参数shuffle为true的简易实现(假设RDD有N个分区,需要重新划分成M个分区):

A、如果N<M,即分区数变多,一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将参数shuffle设置为true;如果没有设置参数shuffle设置为true,则计算后的RDD分区数不变;

B、如果N>M,即分区数变少,并且N和M相差不大,假如N是1000,M是100,那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将参数shuffle设置为false,整个过程不进行shuffle操作;也可以将参数shuffle设置为true,则会进行shuffle操作;在参数shuffle为false的情况下,当M>N时,coalesce是无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系; 

C、如果N>M,即分区数变少,并且两者相差悬殊,比如M=1(合并成一个分区),这时如果将参数shuffle设置为false,父子RDD之间是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,导致计算在少数几个集群节点上计算,从而影响性能。为了避免这种情况,可以将第二个shuffle参数设置为true,这样会在重新分区过程中多一步shuffle操作,意味着上游的分区可以并行运行;

coalesce算子的原理如下图所示:

当使用repartition算子进行重分区操作后,会返回一个指定numPartitions分区数的RDD,在Spark内部,这将使用shuffle重新分布数据(无论是增加分区数还是减少分区数,repartition算子都会进行shuffle操作),如果要减少分区数,考虑使用coalesce算子,这样可以避免执行shuffle。repartition算子的原理如下图所示:

程序实例代码如下图所示:

运行结果如下图所示,可以看到,没有给coalesce算子设置参数shuffle为true,即使numPartitions设置为4,不会进行shuffle操作,结果RDD的分区数不变。

相关文章