Spark:Streaming 实践 Dstream 转换算子、窗口、输出文件

x33g5p2x  于2021-12-07 转载在 Spark  
字(11.6k)|赞(0)|评价(0)|浏览(415)

1、启动集群

启动zookeeper,hadoop,flume
# 1、三个节点
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start 
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status

# 2、master节点 启动hadoop
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh

2、IDEA安装依赖

<properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
        <spark.version>2.4.0</spark.version>
        <hbase.version>1.2.4</hbase.version>
        <hive.version>2.1.1</hive.version>
    </properties>

    <dependencies>
        <!--Scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--Spark & flume-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!--Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>

        <!--Hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-annotations</artifactId>
            <version>${hbase.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>

        <!--Hive-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>

        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.4.0</version>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>com.huaban</groupId>
            <artifactId>jieba-analysis</artifactId>
            <version>1.0.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>
</project>

在之前安装过的只需要安装:

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.4.0</version>
        </dependency>

2.1 启动服务端监听 Socket 服务

命令:nc -lk 9999

2.2 实现 transform() 方法,分割多个单词

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object TransformTest {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("TransformTest").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
    // 以上是固定搭配结构
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
    
    // 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
    val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
    // 7.打印输出结果
    words.print()
    // 8.开启流式计算
    ssc.start()
    // 9.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}

2.3 UpdateStateByKeyTest 更新值

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object UpdateStateByKeyTest {
  //newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
  //runningCount 表示历史的所有相同key的value总和
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount =runningCount.getOrElse(0)+newValues.sum
    Some(newCount)
  }
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
    
    // 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
    ssc.checkpoint("./")
    // 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999)
    
    // 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
    val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
    // 8.调用updateStateByKey操作,统计单词在全局中出现的次数
    var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
    // 9.打印输出结果
    result.print()
    // 10.开启流式计算
    ssc.start()
    // 11.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}

2.4 Dstream 窗口操作

  • 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
  • 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
  • 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
  • 可以启动对这个小段DStream的计算。

方法名称相关说明
window(windowLength, slideInterval)基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream;
countByWindow(windowLength, slideInterval)返回流中元素的一个滑动窗口数;
reduceByWindow(func, windowLength, slideInterval)返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
countByValueAndWindow(windowLength, slideInterval, [numTasks])当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
object WindowTest {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("WindowTest ").setMaster("local[2]")
    // 2.创建SparkContext对象,它是所有任务计算的源头
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
    // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc
      .socketTextStream("master",9999)
      
    // 6.按空格进行切分每一行
    val words: DStream[String] = dstream.flatMap(_.split(" "))
    // 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
    val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
    // 8.打印输出结果
    windowWords.print()
    // 9.开启流式计算
    ssc.start()
    // 10.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}

把批处理时间间隔、窗口长度和滑动时间间隔进行变换,在其中两者非整数倍的情况下,会报错。

2.5 DStream 输出操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object SaveAsTextFilesTest {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    //1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
    //2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    //3.设置日志级别
    sc.setLogLevel("WARN")
    //4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    //5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
    
    //6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
    dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
    //7.开启流式计算
    ssc.start()
    //8.让程序一直运行,除非人为干预停止
    ssc.awaitTermination()
  }
}

相关文章