错误:“数据源org.apache.spark.sql.cassandra不支持流式读取”

hi3rlvi2  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(411)

数据源org.apache.spark.sql.cassandra不支持流式读取

val spark = SparkSession
  .builder()
  .appName("SparkCassandraApp")
  .config("spark.cassandra.connection.host", "localhost")
  .config("spark.cassandra.connection.port", "9042")
  .config("spark.cassandra.auth.username", "xxxxx")
  .config("spark.cassandra.auth.password", "yyyyy")
  .master("local[*]")
  .getOrCreate();

val tableDf3 = spark.**readStream**
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "aaaaa", "keyspace" -> "bbbbb"))
  .load()
  .filter("deviceid='XYZ'")

tableDf3.show(10)
ngynwnxp

ngynwnxp1#

正确-spark cassandra连接器只能用作流接收器,不能用作流源。
如果您想从cassandra获得更改,那么这是一个相当复杂的任务,这取决于cassandra的版本(它是否实现了cdc)和其他因素。
对于spark,可以通过定期重新读取数据来实现某种流,使用timestamp列过滤掉已经读取的数据。您可以在下面的答案中找到有关该方法的更多信息。

相关问题