官方给出的介绍如下:
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
SparkStreaming入门案例也是词频统计,这里也是,先看下代码,再说下模型
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder.appName("WordCount).master("local[*]").getOrCreate()
import spark.implicits._ //隐式转换
// 创建一个持续从localhost:9999端口接收数据的 流式DataFrame,一个包含流数据的无限数据框,其有一个String类型的名为value的Column
val line = spark.readStream.format("socket")
.option("host","localhost")
.option("port",9999)
.load()
val words = line.as[String].flatMap(_.split(" "))
val wordCount = words.groupBy("value").count
//开始数据接收和计算,并有结果更新就打印到控制台
//相关方法下文会谈及
val query = wordCount.writeStream.outputMode("complete").format("console").start
//等待计算结束
query.awaitTermination()
使用netcat命令向localhost:9999发送数据,没有nc命令的话,sudo apt-get install netcat
安装下
nc命令详细使用,可参考这里
$ nc -lk 9999
hello world
hello spark
...
# 运行上面哪个程序
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
# 打印
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|hello| 1|
| world| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|hello| 2|
| world| 1|
|spark| 1|
+------+-----+
...
如图所示,输入流数据会被视为输入表,一张未绑定的表,数据流中每一个到达的数据项(有一个触发间隔trigger interval的概念,比如1s)都会被视为一个追加到该输入表中的数据行(Row)。而基于该输入表的查询会产生一张结果表,每经过一个触发间隔就会更新结果表。结果表一旦更新,就会希望将新的结果保存到外部存储设备上。如下是官网给出的Programming Model
上面词频统计的编程模型,如图:
上面例子中有outputMode("complete")
,完全模式。一共有Append mode(默认)、Complete mode、Update mode(从Spark 2.1.1开始使用)
select,where,map,flatMap,filter,join
等具体适合哪种输出模式,官网给了个表格,如图:
Spark 2.0后,静态的绑定数据,或者流式的未绑定数据都可以使用DataSets和DataFrames API,一样是使用SparkSession作为入口。而流DataFrame通过DataStreamReader
接口创建,即SparkSession.readStream()
的返回类型,像Spark SQL中创建DataFrame一样可以使用format(数据源格式)、schema(数据模式)、option(可选配置)等
format("csv")
maxFilesPerTrigger
,每次触发最大考虑的新文件数,默认无穷latestFirst
,是否首先处理最新的文件,当有大量积压的文件时很有用(默认 false)fileNameOnly
,是否只是基于文件名检测新文件,而非依据路径名,默认false val mySchema = StructType(Array(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
val df = spark.readStream.format("csv").option("sep",",")
.schema(mySchema).load("/path")
// 必须给定host、port
// 像上面的wordcount
val df = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
rowsPerSecond
rampUpTime
numPartitions
从文件源创建的Structured Streaming默认是需要自己定义模式(Schema)的,这能确保即使出错也可以使用一致的模式进行流查询,但学过Spark SQL的就知道会有inferSchema
这个配置参数,Structured Streaming中也是有的不过要手动设置spark.sql.streaming.schemaInference
为true
流式DataFrame/DataSet的分区,指的是分区发现(不懂的话可以搜下Hive分区表的概念),形如/key=value
的子目录存在的话,会递归这些子目录自动发现分区。而且当这些分区字段也存在Schema中时,Spark还会自动填充该字段。要想构成分区目录,必须在查询开始时就存在静态的分区目录,文档上举了个例子,当/data/year=2015/存在时,添加/data/year=2016/是可以的,但是更改分区列是无效的(如通过创建目录/data/date=2016-04-17/)
大多数API还是可以用在流式DataFrame/DataSet上的,除了一些,先列出这些不能使用的API:
limit/take(n)
Stream +left outer/right outer join Static
不支持Static left outer/full outer join Stream
不支持Stream full outer join Stream
不支持count()
,稍微想下也知道流式数据不能统计个数,但还是可以使用ds.groupBy().count()
统计运行中的流式DataSets的计数foreach()
,可替换成ds.writeStream.foreach()
使用show()
,可替换成ds.writeStream.outputMode("complete").format("console")
在控制台打印总的来看,就是因为流式数据不完全,而上面这些操作都是适合于哪些数据确定的操作
想了想,真没必要再次介绍,我之前翻译《Spark 权威指南》时,Chapter 5就是介绍DataSets/DataFrames API的使用,点击跳转
不过要列出一个方法,df.isStreaming
,返回布尔值来判断是否是流数据
内容来源于网络,如有侵权,请联系作者删除!