我使用spark结构化流媒体和cassandra作为Flume。下面的代码段:
override def start(): StreamingQuery = {
sparkContext.getSparkSession()
.readStream
.option("header", "false")
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("failOnDataLoss","false")
.option("subscribe", topicName)
.load()
.writeStream
.option("checkpointLocation",checkpointLocation)
.foreachBatch(forEachFunction.arceusForEachFunction(_,_))
.start()
我正在用下面的文字给Cassandra写信:
RDD.saveToCassandra(keyspace, tableName)
在这样做的时候,我想知道如何处理像casssandra倒下等问题。假设,在要加载的3m数据中,2m是在问题发生之前写的。现在我要么撤销2m要么只处理1m。我不确定会发生什么情况。
这是怎么处理的吗?还是有什么我要写的来处理?
我还查看了spark文档,对于“foreach batch”,它表示“取决于实现”
感谢您的帮助。谢谢
1条答案
按热度按时间ego6inou1#
首先,如果你使用
foreachBatch
您可以只按原样编写Dataframe,而不使用rdd(以下是示例):关于恢复—您无法撤消对cassandra的写入—它不是事务性数据库,因此,如果写入了一些数据,则会写入它们。但在大多数情况下,写入应该是幂等的(除非您对列表或lwt使用操作),并且您可以再次写入数据。spark cassandra连接器确实尝试自动重复写操作,如果它检测到节点关闭,那么您应该对此有所了解。
p、 新版本的spark cassandra连接器(目前在alpha中)将支持spark结构化流数据本机写入cassandra。