我正在尝试从Kafka结构化流。我计划在hdfs中存储检查点。我读了一篇cloudera博客,建议不要在hdfs中存储spark流的检查点。对于结构流检查点也是同样的问题。https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.
在结构化流媒体中,如果我的spark程序关闭了一段时间,如何从checkpoint目录获取最新的偏移量,并在该偏移量之后加载数据。我将检查点存储在一个目录中,如下所示。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
更新:
这是我的结构化流媒体程序读取Kafka消息,解压并写入hdfs。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
# zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
4条答案
按热度按时间xa9qqrwz1#
在结构化流媒体中,如果我的spark程序关闭了一段时间,如何从checkpoint目录获取最新的偏移量,并在该偏移量之后加载数据。
在checkpointdir文件夹下,您将找到一个文件夹名“offsets”。文件夹“offsets”维护从kafka请求的下一个偏移量。打开“偏移量”文件夹下的最新文件(最新批处理文件),下一个预期偏移量的格式如下
要在偏移量之后加载数据,请将下面的属性设置为spark读取流
0、1、2是主题中的分区。
qyzbxkaa2#
最好在长期存储(hdfs、awss3等)上存储检查点。我想在这里补充一点,属性“failondataloss”不应该设置为false,因为这不是最佳实践。数据丢失是没有人愿意承担的。休息吧,你走对了路。
d8tt03nd3#
在查询中,尝试应用检查点,同时将结果以parquet等格式写入hdfs等持久性存储。这对我有好处。
oipij1gg4#
据我所知,它建议在hbase、kafka、hdfs或zookeeper中维护偏移管理。
“值得一提的是,您还可以在hdfs这样的存储系统中存储偏移量。与上述选项相比,在hdfs中存储偏移量是一种不太流行的方法,因为与zookeeper和hbase等其他系统相比,hdfs具有更高的延迟。”
您可以在spark文档中找到如何从以下位置的现有检查点重新启动查询:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-从检查点失败