来自Kafka检查点和确认的spark结构化流

lqfhib0f  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(345)

在我的spark结构化流媒体应用程序中,我正在阅读来自kafka的消息,对它们进行过滤,然后最终坚持使用cassandra。我正在使用spark 2.4.1。从结构化流媒体文档
容错语义提供一次端到端的精确语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎,以可靠地跟踪处理的确切进度,从而使其能够通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于kafka偏移量或kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流汇被设计成处理后处理的幂等元。同时,使用可重放源和幂等接收器,结构化流可以确保在任何失败情况下端到端只使用一次语义。
但我不确定spark究竟是如何做到这一点的。在我的例子中,如果cassandra集群关闭导致写操作失败,那么kafka的检查点不会记录这些偏移量。
Kafka检查点偏移量是仅基于对Kafka的成功读取,还是每个消息都考虑了包括写入在内的整个操作?

kuuvgm7e

kuuvgm7e1#

spark使用多个日志文件来确保容错性。与查询相关的是偏移日志和提交日志。从streamexecution类文档:

/**
   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
   * that a given batch will always consist of the same data, we write to this log *before* any
   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
   */
  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

  /**
   * A log that records the batch ids that have completed. This is used to check if a batch was
   * fully processed, and its output was committed to the sink, hence no need to process it again.
   * This is used (for instance) during restart, to help identify which batch to run next.
   */
  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

因此,当它从Kafka读取时,它会将偏移量写入 offsetLog 只有在处理数据并将其写入接收器(在您的cassandra中)之后,它才会将偏移量写入 commitLog .

0ve6wy6x

0ve6wy6x2#

spark结构化流媒体并不像一个“正常”的Kafka消费者那样向Kafka提供补偿。spark使用检查点机制在内部管理偏移量。
请看下面这个问题的第一个回答,它很好地解释了如何使用检查点和commitslog管理状态:如何为结构化查询获取kafka偏移量,以便进行手动可靠的偏移量管理?

相关问题