在spark cassandra中使用Dataframe创建键空间时出错

a64a0gku  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(509)

我试图将spark连接到cassandra,然后从flask查询keyspace和表。
问题是,当我运行web应用程序时,我得到一个错误,表示没有创建中的键空间。 cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace MyKeyspace does not exist" 在spark中,我运行以下命令:

val flightRecommendations = finalPredictions.writeStream.foreachBatch {
      (batchDF: DataFrame, batchId: Long) =>
        batchDF
            .write
            .cassandraFormat("MytableName", "MyKeyspace") 
            .option("cluster", "cassandra_cluster")
            .mode("append")
            .save
    }.start()

我的问题是上面的代码是否自动生成键空间和表。
我认为这也可能是一个连接问题,因为我在docker工作,设置如下: spark.setCassandraConf("cassandra_cluster", CassandraConnectorConf.ConnectionHostParam.option("cassandra")) 在spark submit命令中,我还添加了以下两种配置:

--conf spark.cassandra.connection.host=cassandra \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \

这很奇怪,因为spark submit没有给出错误,但是keyspace没有被创建。

ua4mk5z4

ua4mk5z41#

是的,这是可能的,因为SparkCassandra连接器2.5.0。有一个新功能 createCassandraTableEx 它允许基于dataframe模式创建一个新的表,并且它有一个选项来处理表已经存在的情况(除了其他事情,比如,控制聚类列的排序、表选项等),在2.5.0之前有 createCassandraTable 函数,但如果表已存在,则引发异常。
下面是宣布2.5.0版本的博客文章中的示例。对于具有以下结构的Dataframe:

root
 |-- id: integer (nullable = false)
 |-- c: integer (nullable = false)
 |-- t: string (nullable = true)

可以使用以下代码创建新表:

import com.datastax.spark.connector.cql.ClusteringColumn
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._

data.createCassandraTableEx("test", "test_new", Seq("id"), 
  Seq(("c", ClusteringColumn.Descending)),
  ifNotExists = true, tableOptions = Map("gc_grace_seconds" -> "1000"))

你不需要使用 foreachBatch 在新版本中,只有在2.5.0之前才需要该版本,您只需编写:

val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .option("checkpointLocation", ".../checkpoint")
      .option("table", "tablename")
      .("keyspace", "ksname")
      .start()

使用spark3.x和scc3.x,您可以使用sparksql在cassandra中创建键空间和表-有关更多详细信息,请参阅文档。

相关问题