spark结构化流检查点在生产中的使用

gr8qqesn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(411)

在使用spark结构化流媒体时,我很难理解检查点是如何工作的。
我有一个spark进程,它生成一些事件,我将这些事件记录在一个配置单元表中。对于这些事件,我在kafka流中接收一个确认事件。
我创造了一个新的Spark过程
将配置单元日志表中的事件读取到Dataframe中
使用spark结构化流将这些事件与确认事件流连接起来
将联接的Dataframe写入hbase表。
我在sparkshell中测试了代码,在伪代码(我使用的是scala)下面,它运行良好。

val tableA = spark.table("tableA")

val startingOffset = "earliest"

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets)
  .option("otherOptions", otherOptions)

val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")

joinTableAWithStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  ).start()
  .awaitTermination()

现在,我想将此代码安排为定期运行,例如每15分钟运行一次,我正在努力理解如何在这里使用检查点。
每次运行这段代码时,我都希望只从流中读取上一次运行中尚未读取的事件,并将这些新事件与日志表进行内部连接,以便只向最终的hbase表中写入新数据。
我在hdfs中创建了一个目录来存储检查点文件。我将该位置提供给spark submit命令,我使用该命令调用spark代码。

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory 
--all_the_other_settings_and_libraries

此时,代码每15分钟运行一次,没有任何错误,但它基本上什么都不做,因为它没有将新事件转储到hbase表中。另外,检查点目录是空的,而我假设一些文件必须写入那里?
readstream函数是否需要调整以便从最新的检查点开始读取?

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets) ??
  .option("otherOptions", otherOptions)

我真的很难理解关于这个的spark文档。
提前谢谢!

ldfqzlk8

ldfqzlk81#

触发器

“现在,我希望将此代码安排为定期运行,例如每15分钟运行一次,我正在努力理解如何在这里使用检查点。
如果您希望每15分钟触发一次作业,可以使用触发器。
您不需要专门“使用”检查点,只需提供一个可靠的(例如hdfs)检查点位置,如下所示。

检查点

每次运行这段代码时,我只想从流中读取上一次运行中尚未读取的事件[…]”
在spark结构化流应用程序中从kafka读取数据时,最好直接在您的应用程序中设置检查点位置 StreamingQuery . spark使用此位置创建检查点文件,用于跟踪应用程序的状态,并记录已从kafka读取的偏移量。
重新启动应用程序时,它将检查这些检查点文件,以了解从何处继续读取Kafka,因此它不会跳过或错过任何消息。您不需要手动设置启动偏移量。
请务必记住,只允许对应用程序代码进行特定更改,以便检查点文件可用于安全重新启动。在structured streaming programming guide中可以找到一个很好的概述,该指南介绍了流查询更改后的恢复语义。
总的来说,对于高效的spark结构化流媒体应用程序,我建议采用以下结构:

val spark = SparkSession.builder().[...].getOrCreate()

val streamOfData = spark.readStream 
  .format("kafka") 
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
  .option("startingOffsets", startingOffsets) 
  .option("otherOptions", otherOptions)
  .load()

// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]

val streamingQuery = processedStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  )
  .option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
  .trigger(Trigger.ProcessingTime("15 minutes"))
  .start()

streamingQuery.awaitTermination()

相关问题