spark结构化流媒体与cassandra

az31mfrm  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(433)

我使用spark结构化流媒体和cassandra作为Flume。下面的代码段:

override def start(): StreamingQuery = {
    sparkContext.getSparkSession()
      .readStream
      .option("header", "false")
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("failOnDataLoss","false")
      .option("subscribe", topicName)
      .load()
      .writeStream
      .option("checkpointLocation",checkpointLocation)
      .foreachBatch(forEachFunction.arceusForEachFunction(_,_))
      .start()

我正在用下面的文字给Cassandra写信:

RDD.saveToCassandra(keyspace, tableName)

在这样做的时候,我想知道如何处理像casssandra倒下等问题。假设,在要加载的3m数据中,2m是在问题发生之前写的。现在我要么撤销2m要么只处理1m。我不确定会发生什么情况。
这是怎么处理的吗?还是有什么我要写的来处理?
我还查看了spark文档,对于“foreach batch”,它表示“取决于实现”

感谢您的帮助。谢谢

ego6inou

ego6inou1#

首先,如果你使用 foreachBatch 您可以只按原样编写Dataframe,而不使用rdd(以下是示例):

.foreachBatch((df, batchId) =>
        df.write.cassandraFormat("sttest", "test")
          .mode(SaveMode.Append).save()
      )

关于恢复—您无法撤消对cassandra的写入—它不是事务性数据库,因此,如果写入了一些数据,则会写入它们。但在大多数情况下,写入应该是幂等的(除非您对列表或lwt使用操作),并且您可以再次写入数据。spark cassandra连接器确实尝试自动重复写操作,如果它检测到节点关闭,那么您应该对此有所了解。
p、 新版本的spark cassandra连接器(目前在alpha中)将支持spark结构化流数据本机写入cassandra。

相关问题