我想把结构化流数据集写入cassandra。我在脚本中使用水印,并使用与之类似的代码
地址:https://github.com/polomarcus/spark-structured-streaming-examples 或者https://github.com/fhuertas/cassandra-sink-spark-structured-streaming 接收数据。
脚本的流程是:kafka->watermark event time->groupby->sink to cassandra
val counts= words
.withWatermark("eventTime", "60 seconds")
.groupBy(
window($"eventTime","60 seconds", "30 seconds"),
$"word")
.count()
val query= counts
.writeStream
.format("streamsinkprovider.CassandraSinkProvider")
.outputMode("append")
.start()
我尝试了两种方法来解决这个问题:
import org.apache.spark.sql.cassandra._
df.select("word","count").as[(String,BigInt)]
.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.cassandraFormat("test", CassandraLibrary.CASSANDRA_KEYSPACE)
.save()
以及
ds.rdd.saveToCassandra(
CassandraLibrary.CASSANDRA_KEYSPACE,
"test", SomeColumns("word","count"), cassandraWriteConf)
在这两种情况下,我得到以下例外:
java.lang.assertionerror:Assert失败:没有计划eventtimewatermark eventtime#272:时间戳,间隔1分钟
似乎savetocassandra试图创建一个rdd,这使得spark创建了一个新的计划。
jdbcsink(mysql)对相同的代码运行良好,没有错误。
我已经用spark版本2.2.0和2.3.2以及datastax.spark-cassandra-connector 2.0.5和2.3.2尝试过了。我尝试了dse cassandra 5.1和apache cassandra 3.11
你知道怎么避开这个问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!