由于Assert失败,将spark结构流数据写入cassandra失败

cotxawn7  于 2021-06-15  发布在  Cassandra
关注(0)|答案(0)|浏览(250)

我想把结构化流数据集写入cassandra。我在脚本中使用水印,并使用与之类似的代码
地址:https://github.com/polomarcus/spark-structured-streaming-examples 或者https://github.com/fhuertas/cassandra-sink-spark-structured-streaming 接收数据。
脚本的流程是:kafka->watermark event time->groupby->sink to cassandra

val counts= words
        .withWatermark("eventTime", "60 seconds")
        .groupBy(
            window($"eventTime","60 seconds", "30 seconds"),
            $"word")
        .count()

    val query= counts
        .writeStream
        .format("streamsinkprovider.CassandraSinkProvider")
        .outputMode("append")
        .start()

我尝试了两种方法来解决这个问题:

import org.apache.spark.sql.cassandra._

df.select("word","count").as[(String,BigInt)]
    .write
    .format("org.apache.spark.sql.cassandra")
    .mode(SaveMode.Append)
    .cassandraFormat("test", CassandraLibrary.CASSANDRA_KEYSPACE)
    .save()

以及

ds.rdd.saveToCassandra(
        CassandraLibrary.CASSANDRA_KEYSPACE, 
        "test", SomeColumns("word","count"), cassandraWriteConf)

在这两种情况下,我得到以下例外:
java.lang.assertionerror:Assert失败:没有计划eventtimewatermark eventtime#272:时间戳,间隔1分钟
似乎savetocassandra试图创建一个rdd,这使得spark创建了一个新的计划。
jdbcsink(mysql)对相同的代码运行良好,没有错误。
我已经用spark版本2.2.0和2.3.2以及datastax.spark-cassandra-connector 2.0.5和2.3.2尝试过了。我尝试了dse cassandra 5.1和apache cassandra 3.11
你知道怎么避开这个问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题