ApacheSpark结构化流媒体:水印与精确一次语义

tmb3ates  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(415)

《编程指南》指出,结构化流式传输保证使用适当的源/汇实现端到端的精确一次语义。
然而,我不明白这是如何工作时,工作崩溃,我们有一个水印应用。
下面是一个例子,我目前如何想象它的工作,请纠正我的任何一点,我误解。提前谢谢!
例子:
spark作业:在每个1小时窗口中计数#个事件,并带有1小时水印。
信息:
a-时间戳上午10点
b-时间戳10:10am
c-时间戳10:20am
x-时间戳12pm
y-时间戳12:50pm
z-时间戳8pm
我们开始作业,从源代码中读取a,b,c,作业在上午10:30崩溃,然后我们把它们写到我们的Flume中。
下午6点,作业返回,并知道使用保存的检查点/wal重新处理a、b、c。上午10点到11点窗口的最后计数是3。
接下来,它并行读取来自kafka,x,y,z的新消息,因为它们属于不同的分区。首先处理z,因此最大事件时间戳设置为8pm。当作业读取x和y时,它们现在位于水印后面(8pm-1小时=7pm),因此它们作为旧数据被丢弃。晚上8点到9点的最后一次计数是1,在12点到1点的时间窗口内,作业不报告任何内容。我们丢失了x和y的数据。
---结束示例---
这个情景准确吗?如果是这样,1小时水印可能足以处理从kafka sspark正常流动时的延迟/无序数据,但在spark作业停止/kafka连接长时间丢失时则不足以处理。避免数据丢失的唯一选择是使用水印的时间比您预期的工作时间长吗?

frebpwbc

frebpwbc1#

在迷你批处理过程中,水印是一个固定值。在您的示例中,由于x、y和z在同一个minibatch中处理,因此用于此记录的水印将是上午9:20。完成后,小批量水印将更新到晚上7点。
下面引用了设计文档中有关实现水印功能的功能部件spark-18124的内容:
要在基于触发器的执行中计算拖放边界,我们必须执行以下操作。
在每个触发器中,在聚合数据的同时,我们还扫描触发器数据中事件时间的最大值
触发器完成后,计算水印=max(触发器前的事件时间,触发器中的最大事件时间)-阈值
可能模拟会更详细地描述:

import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)

val schema = StructType(StructField("vilue", StringType) ::
                        StructField("timestamp", TimestampType) ::
                        Nil)

val eventStream = spark
  .readStream
  .option("sep", ";")
  .option("header", "false")
  .schema(schema)
  .csv(dir.toString)

// Watermarked aggregation
val eventsCount = eventStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .count

def writeFile(path: Path, data: String) {
  val file = fs.create(path)
  file.writeUTF(data)
  file.close()
}

// Debug query
val query = eventsCount.writeStream
  .format("console")
  .outputMode("complete")
  .option("truncate", "false")
  .trigger(ProcessingTime("5 seconds"))
  .start()

writeFile(new Path(dir, "file1"), """
  |A;2017-08-09 10:00:00
  |B;2017-08-09 10:10:00
  |C;2017-08-09 10:20:00""".stripMargin)

query.processAllAvailable()
val lp1 = query.lastProgress

// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }

writeFile(new Path(dir, "file2"), """
  |Z;2017-08-09 20:00:00
  |X;2017-08-09 12:00:00
  |Y;2017-08-09 12:50:00""".stripMargin)

query.processAllAvailable()
val lp2 = query.lastProgress

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }

writeFile(new Path(dir, "file3"), "")

query.processAllAvailable()
val lp3 = query.lastProgress

// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }

query.stop()
fs.delete(dir, true)

请注意批处理0是如何以水印开头的 1970-01-01 00:00:00 而第1批是从水印开始的 2017-08-09 09:20:00 (第0批的最大事件时间减去1小时)。第2批为空时使用水印 2017-08-09 19:00:00 .

dced5bon

dced5bon2#

首先处理z,因此最大事件时间戳设置为8pm。
没错。尽管 Z 首先,从当前查询迭代中的最大时间戳中减去水印。这意味着08:00pm将被设置为减去水印时间的时间,意味着12:00和12:50将被丢弃。
根据文件:
对于从时间t开始的特定窗口,引擎将保持状态并允许延迟数据更新状态,直到(引擎看到的最大事件时间-延迟阈值>t)
避免数据丢失的唯一选择是使用水印的时间比您预期的工作时间长吗
不一定。假设您将每个kafka查询要读取的最大数据量设置为100项。如果读取的是小批量,并且从每个分区连续读取,则每个批的每个最大时间戳可能不是代理中最新消息的最长时间,这意味着您不会丢失这些消息。

相关问题