我试图将spark连接到cassandra,然后从flask查询keyspace和表。
问题是,当我运行web应用程序时,我得到一个错误,表示没有创建中的键空间。 cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace MyKeyspace does not exist"
在spark中,我运行以下命令:
val flightRecommendations = finalPredictions.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.cassandraFormat("MytableName", "MyKeyspace")
.option("cluster", "cassandra_cluster")
.mode("append")
.save
}.start()
我的问题是上面的代码是否自动生成键空间和表。
我认为这也可能是一个连接问题,因为我在docker工作,设置如下: spark.setCassandraConf("cassandra_cluster", CassandraConnectorConf.ConnectionHostParam.option("cassandra"))
在spark submit命令中,我还添加了以下两种配置:
--conf spark.cassandra.connection.host=cassandra \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
这很奇怪,因为spark submit没有给出错误,但是keyspace没有被创建。
1条答案
按热度按时间ua4mk5z41#
是的,这是可能的,因为SparkCassandra连接器2.5.0。有一个新功能
createCassandraTableEx
它允许基于dataframe模式创建一个新的表,并且它有一个选项来处理表已经存在的情况(除了其他事情,比如,控制聚类列的排序、表选项等),在2.5.0之前有createCassandraTable
函数,但如果表已存在,则引发异常。下面是宣布2.5.0版本的博客文章中的示例。对于具有以下结构的Dataframe:
可以使用以下代码创建新表:
你不需要使用
foreachBatch
在新版本中,只有在2.5.0之前才需要该版本,您只需编写:使用spark3.x和scc3.x,您可以使用sparksql在cassandra中创建键空间和表-有关更多详细信息,请参阅文档。