kafka结构化流检查点

deyfvvtc  于 2021-06-02  发布在  Hadoop
关注(0)|答案(4)|浏览(464)

我正在尝试从Kafka结构化流。我计划在hdfs中存储检查点。我读了一篇cloudera博客,建议不要在hdfs中存储spark流的检查点。对于结构流检查点也是同样的问题。https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.
在结构化流媒体中,如果我的spark程序关闭了一段时间,如何从checkpoint目录获取最新的偏移量,并在该偏移量之后加载数据。我将检查点存储在一个目录中,如下所示。

df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()

更新:
这是我的结构化流媒体程序读取Kafka消息,解压并写入hdfs。

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))

# zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()
xa9qqrwz

xa9qqrwz1#

在结构化流媒体中,如果我的spark程序关闭了一段时间,如何从checkpoint目录获取最新的偏移量,并在该偏移量之后加载数据。
在checkpointdir文件夹下,您将找到一个文件夹名“offsets”。文件夹“offsets”维护从kafka请求的下一个偏移量。打开“偏移量”文件夹下的最新文件(最新批处理文件),下一个预期偏移量的格式如下

{"kafkatopicname":{"2":16810618,"1":16810853,"0":91332989}}

要在偏移量之后加载数据,请将下面的属性设置为spark读取流

.option("startingOffsets", "{\""+topic+"\":{\"0\":91332989,\"1\":16810853,\"2\":16810618}}")

0、1、2是主题中的分区。

qyzbxkaa

qyzbxkaa2#

最好在长期存储(hdfs、awss3等)上存储检查点。我想在这里补充一点,属性“failondataloss”不应该设置为false,因为这不是最佳实践。数据丢失是没有人愿意承担的。休息吧,你走对了路。

d8tt03nd

d8tt03nd3#

在查询中,尝试应用检查点,同时将结果以parquet等格式写入hdfs等持久性存储。这对我有好处。

oipij1gg

oipij1gg4#

据我所知,它建议在hbase、kafka、hdfs或zookeeper中维护偏移管理。
“值得一提的是,您还可以在hdfs这样的存储系统中存储偏移量。与上述选项相比,在hdfs中存储偏移量是一种不太流行的方法,因为与zookeeper和hbase等其他系统相比,hdfs具有更高的延迟。”
您可以在spark文档中找到如何从以下位置的现有检查点重新启动查询:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-从检查点失败

相关问题