Spark之Spark Streaming(初级篇)

x33g5p2x  于2020-11-02 转载在 Spark  
字(8.5k)|赞(0)|评价(0)|浏览(805)

1、Spark Streaming概述

Spark官网对Spark Streaming做出了精准的定义,如上图所示,简单翻译成汉语的意思就是Spark Streaming使得构建规模性的、可容错的流式应用变得更容易。Spark Streaming是核心Spark API的扩展,能够实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表盘。而且,还可以在数据流上进一步运用Spark提供的机器学习和图处理算子。总体上来说,可以从三个方面对Spark Streaming进行考量:输入——计算——输出,这三者的关系简单描述如下图所示:

**输入:**可以从Kafka,Flume,HDFS等获取数据;

**计算:**可以运用map,reduce,join等一系列算子,通过Spark计算引擎进行计算(基本和普通的RDD一样,使用起来非常方便);

**输出:**可以输出到HDFS,数据库,HBase等。

从Spark官网上可以看到关于Spark Streaming的详细介绍,总结起来,Spark Streaming具有如下几个特点:

A、易用

B、容错

C、易与Spark整合

2、Spark Streaming数据处理的特点

在Spark Streaming的内部,其工作原理如下图所示。Spark Streaming接收实时的输入数据流,并将数据流进行分批,然后由Spark引擎进行处理,以批量生成最终的数据结果流。

从图中可以看出,Spark Streaming将输入的数据分成多个batch进行处理。因此,从严格意义上来说,Spark Streaming并不是一个真正的实时计算框架,因为它是分批次处理数据。Spark Streaming提供了一个高层抽象,称为discretized stream或DStream,DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流进行创建,也可以通过在其他DStream上应用高级算子操作来进行创建。在Spark Streaming内部,DStream表示为一系列的RDD。

接下来实际运行Spark Streaming自带的一个小示例程序:NetworkWordCount,看看Spark Streaming程序的运行效果。由于在该示例的运行过程中需要使用netcat网络工具(netcat是一个用于TCP/UDP连接和监听的Linux工具,主要用于网络传输和调试领域),所以要事先进行安装,该示例程序的工作流程是通过网络工具newcat发送一个字符串给Spark Streaming进行处理,通过单词计数操作后,将单词计数的结果进行输出显示。

A、安装netcat工具

将netcat安装包(这里使用的是netcat-0.7.1.tar.gz)上传到主机hadoop221的/root/tools/目录下,然后进入到该目录下,运行命令tar -zxvf netcat-0.7.1.tar.gz -C /root/training/,将netcat安装包解压到/root/training/目录下,进入到/root/training/netcat-0.7.1目录,创建opt/netcat目录,用于存放编译产生的文件(即安装目录),运行命令./configure --prefix=/root/training/netcat-0.7.1/opt/netcat,进行配置,指定刚刚创建的目录为安装目录;运行命令make,进行编译;运行命令make install,进行安装,等待安装完成即可。

B、启动netcat工具

在启动netcat和运行Spark Streaming示例程序之前,有一个非常关键的问题需要注意,那就是执行Spark Streaming程序要确保机器的cpu核数大于等于2,其原因在Spark官网上给出了说明,如下图所示。简单解释就是,在执行Spark Streaming程序时,需要有一个线程来专门获取数据,另一个线程专门用于进行计算处理,而如果只有一个线程的话,就无法同时做这两样事情,因而就无法正常执行该示例程序。

一切准备就绪后,运行命令nc –l –p 1234,启动netcat数据流服务器,并绑定1234号端口;新开一个命令行窗口,进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令bin/run-example streaming.NetworkWordCount localhost 1234,执行Spark Streaming示例程序并监听1234号端口以获取数据,在netcat运行窗口输入一串字符:Hello World I love Beijing and ShenZhen,输出结果如下图所示。从示例程序运行的结果可以看到,Spark Streaming程序一直在周期性地获取数据并进行计算,一旦netcat端输入数据,Spark Streaming程序便能够迅速地在采样时间周期内获取到数据并进行单词计数处理,最后将结果显示到屏幕上。

3、Spark Streaming编程初体验

上面运行了Spark Streaming自带的NetworkWordCount示例程序,大概了解了Spark Streaming程序的执行特点。下面来实现自己的NetworkWordCount程序,整个程序的代码如下图所示:

运行结果如下图所示,可以看到,该程序的输出结果跟前文运行Spark Streaming自带的NetworkWordCount示例程序几乎是一样的。

4、StreamingContext对象讲解

在Spark Core中有SparkContext实例对象可供开发者使用,在SparkSQL中有SQLContext实例对象可供开发者使用,类似地,在Spark Streaming中也有StreamingContext实例对象可供开发者使用。对于Spark Streaming程序的开发来说,一个StreamingContext对象的创建是非常有必要的,它是Spark Streaming所有流操作的基本入口。而一个StreamingContext对象可以使用SparkConf对象进行创建,使用SparkConf对象创建StreamingContext对象,其程序代码如下图所示:

除此之外,还可以通过一个现有的SparkContext实例创建StreamingContext对象,如下图所示:

在这里,有几点需要进行说明:

appName参数是Spark应用程序在集群UI上显示的名称;

master是Spark,Mesos或YARN集群的URL地址,或者一个特殊的“local[/*]”字符串来让程序以本地模式运行;

当在集群上运行Spark程序时,不需要在程序中硬性指定master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。但是,对于本地测试和单元测试,可以通过“local[/*]”来运行Spark Streaming程序(需要确保本地系统中的cpu核心数够用);

StreamingContext会在内部创建一个SparkContext实例(它是所有Spark功能的起点),可以通过ssc.sparkContext访问到这个SparkContext实例;

批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。

这里还有几点需要特别注意:

一旦一个StreamingContext实例对象开始运行,就不能再设置或添加新的流式计算;

一旦一个上下文被停止,它将无法重新启动;

同一时刻,一个JVM中只能有一个StreamingContext实例处于活动状态;

StreamingContext上的stop()方法也会停止SparkContext实例对象的运行,如果要仅停止StreamingContext(保持SparkContext活跃),需要将stop()方法的可选参数stopSparkContext设置为false;

只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重新用来创建多个StreamingContext。

5、离散流(DStream)——Discretized Stream解析

DiscretizedStream或DStream是Spark Streaming对流式数据的基本抽象,它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经过处理之后的数据流。在Spark内部,DStream由一系列连续的RDD表示,如下图所示:

举个栗子:在前面的NetworkWordCount示例中,将一行行文本组成的流转换为单词流,具体做法为:将flatMap算子操作应用于名为lines的DStream中的每个RDD上,以生成words DStream的RDD。如下图所示:

但其实呢,DStream和RDD也存在一定的区别,具体参考如下两张图:

6、DStream的转换操作(transformation)

前面说过,通常情况下,可以简单地把DStream当做RDD看待,如同RDD一样,DStream也支持很多的转换算子操作,简单总结如下表所示:

可以看到,上表中的绝大多数算子跟前面文章中介绍的RDD算子是一致的,对于最后两个transformation算子,这里着重简单讲解下:

transform(func)

**功能:**通过RDD to RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,最终得到一个新的RDD;

**举例:**在MyNetworkWordCount程序示例中,也可以使用transform算子来生成元组对,代码如下图所示。这里需要注意的是,在words.transform(x = > x.map(x => (x, 1)))这行代码中,第一个x表示RDD(DStream中的每个RDD),而第三个x表示单词word(RDD中的每个单词)。

updateStateByKey(func)

**功能:**该算子操作允许在不断使用新数据进行更新的同时,还能够保持任意状态;其中有两个需要注意的关键点:定义状态——状态可以是任意的数据类型;定义状态更新函数——如何使用更新前的状态和从输入流里面获取新的值来更新状态;

**举例:**重写MyNetworkWordCount程序,累计每个单词出现的次数,代码如下图所示:

运行结果如下图所示,可以看到,相继输入几个不同的字符串,该示例程序将每个单词出现的次数进行了累计并输出显示到了控制台。

7、Spark Streaming的窗口操作

Spark Streaming作为一种伪流式计算框架,它还提供了窗口计算功能,允许在数据流的滑动窗口上应用转换算子操作。下图简单描述了滑动窗口的工作原理:

如上图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上图的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位进行滑动。这表明任何窗口操作都需要指定以下两个参数:

窗口长度(windowlength)——窗口的时间长度(上图的示例中为:3);

滑动间隔(slidinginterval)——两次相邻的窗口操作的时间间隔(即每次滑动的时间长度)(上图示例中为:2)。

这两个参数都必须是源DStream的批间隔的倍数(上图示例中为:1)。

下面举一个例子来进一步解释窗口操作。假设希望对之前的单词计数的示例程序进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey算子操作。这是通过使用reduceByKeyAndWindow算子操作来完成的,代码如下图所示:

运行结果如下图所示,这里需要强调的是,reduceByKeyAndWindow算子操作中的两个参数Duration和slideDuration有一些限制,在程序的开头处使用了new StreamingContext(sparkConf, Seconds(2))来创建StreamingContext对象,Seconds(2)中的参数值为2,Duration和slideDuration的值都必须是这个参数值的整数倍,否则程序会运行出错,不要求Duration参数值是slideDuration参数值的整数倍。

还有一些常见的窗口操作列举如下,所有这些操作都用到了上述两个参数,即windowLength和slideInterval。

window(windowLength, slideInterval)

基于源DStream产生的窗口化的批数据,来计算一个新的DStream;

countByWindow(windowLength, slideInterval)

返回数据流中元素的一个滑动窗口数;

reduceByWindow(func, windowLength, slideInterval)

返回一个单元素流,利用函数func聚集滑动时间间隔内的流元素,创建这个单元素流,函数必须是相关联的,以使计算能够正确的并行计算;

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来,需要注意的是:在默认情况下,这个算子利用了Spark默认的并发任务数去进行分组,可以使用numTasks参数设置不同的任务数;

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上述reduceByKeyAndWindow算子的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。 注意,使用此操作必须启用检查点;

countByValueAndWindow(windowLength, slideInterval, [numTasks])

应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream,每个key的值都是它们在滑动窗口中出现的频率。

8、DStream输入

DStream输入,表示从数据源获取输入数据流的DStream。在MyNetworkWordCount示例程序中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从数据源中获取数据,并将数据存入内存中用于进一步的处理。

输入DStream表示从数据源获取的原始数据流,简单进行分类,Spark Streaming拥有两类数据源:

基本源(Basic Source):这些源在StreamingContext API中直接就可以使用,例如文件系统、套接字连接、Akka的actor等;

高级源(Advanced Source):这些源包括Kafka,Flume,Kinesis,Twitter等等。

下面通过具体的实例程序,来详细介绍这些基本数据源的使用,高级数据源将在下一篇文章进行介绍。

A、文件流

通过监控文件系统的变化(这里指的是文件目录),当有新文件添加时,则将它读入并作为数据流,需要注意如下几个关键点:

这些文件需要具有相同的格式;

这些文件通过原子移动或重命名文件的方式在dataDirectory目录中创建;

如果在文件中追加内容,这些追加的新数据是不会被读取的。

在Spark Streaming程序中使用文件流,代码如下图所示:

运行结果如下图所示,这里需要解释的是,该SparkStreaming程序示例,要想实验成功,不能直接拷贝文件到对应目录下,也不能直接在该目录下的文件中追加数据。这里采用的策略是,在该目录下,首先创建空文件MyFile1.txt,然后在MyFile1.txt文件中追加一行字符串“Hello World, I love China”,保存后复制粘贴该文件得到MyFile1 - 副本.txt文件,这样SparkStreaming程序便能够检测到该目录发生了变化;同理,然后在MyFile1.txt文件末尾再追加一行字符串“Hello World, I love Beijing”,保存后复制粘贴该文件得到MyFile1 - 副本 (2).txt文件,如此操作多次,便能够看到如下图的输出结果。

B、RDD队列流

使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream,用于调试Spark Streaming应用程序。示例程序的代码如下图所示:

运行结果如下图所示:

C、套接字流

通过监听Socket套接字端口来获取数据,在前文第3节中,编写自己的NetworkWordCount程序示例,便是使用的套接字流,这里不再重复赘述。

9、DStream输出

DStream的输出操作允许将程序的输出结果推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,SparkStreaming定义了如下几种输出操作:

其它的输出操作跟普通RDD的输出操作一致,这里不再重复讲述,可以参考前面文章的介绍,这里着重介绍下foreachRDD算子。DStream.foreachRDD是一个强大的原语,它将数据发送到外部系统中。这里举个使用foreachRDD算子将输出结果保存到数据库系统中的程序示例,第一步:创建数据库连接,将数据保存到外部数据库(借鉴前面的NetworkWordCount程序示例,改写输出结果的部分,示例程序的代码如下图所示):

如果直接这样运行程序,将会出现如下图所示的Exception。其原因是:Connection对象不是一个可被序列化的对象,它不能在RDD的每个Worker进程中运行,即Connection不能在RDD分布式环境中的每个分区上运行,因为不同的分区可能运行在不同的Worker进程中,所以需要在每个RDD分区上单独创建Connection对象。

对上述程序进行改造后,第二步:在每个RDD分区上单独创建Connection对象,示例程序的代码如下图所示:

10、DataFrame和SQL操作

在SparkStreaming程序中,还可以很方便地使用DataFrame和SQL操作来处理流数据。但有个前提条件,必须使用当前的StreamingContext对象对应的SparkContext对象,来创建一个SparkSession对象。此外,必须这样做的另一个原因是,使得应用可以在driver程序故障时能够重新启动,这是通过创建一个可以延迟实例化的单例SparkSession对象来实现的。

在下面的示例程序中,使用DataFrame和SQL操作来改造之前的NetworkWordCount示例程序,并对单词进行计数。将每个RDD转换为DataFrame,并注册为临时表,然后在这张表上执行SQL查询操作。示例程序的代码如下图所示:

在主机hadoop221中运行命令nc -l -p 1234,然后运行MyNetworkWordCountDataFrame.scala程序,再在主机hadoop221中输入字符串Hello World, I love China and Beijing and Shenzhen,程序输出的结果如下图所示:

本文对于Spark Streaming的初级知识的介绍就到这里,在Spark Streaming中还有些比较有用的知识,将放在下一篇文章中进行介绍,敬请关注!

参考文献:

——《百度百科》

——《CSDN博文》

——《潭州大数据课程课件》

相关文章

微信公众号

最新文章

更多