如何使用pyspark将结构化流数据写入cassandra表?

qcbq4gxm  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(365)

这是运行strm.py文件的终端命令
$spark_home/bin/spark submit--master local--driver memory 4g--num executors 2--executor memory 4g--org.apache包。spark:spark-sql-kafka-0-10_2.11:2.4.0 org.apache。spark:spark-cassandra-connector_2.11:2.4.0标准副本
错误:
无法从jar org.apache加载主类。spark:spark-cassandra-connector_2.11:2.4.0,uri为org.apache.spark。请通过--class指定一个类。在org.apache.spark.deploy.sparksubmitarguments.error(sparksubmitarguments。scala:657)atorg.apache.spark.deploy.sparksubmitarguments.loadenvironmentarguments(sparksubmitarguments)。scala:224)在org.apache.spark.deploy.sparksubmitarguments。scala:116)在org.apache.spark.deploy.sparksubmit$$anon$2$$anon$1上。scala:907) 在org.apache.spark.deploy.sparksubmit$$anon$2.parsearguments(sparksubmit。scala:907)在org.apache.spark.deploy.sparksubmit.dosubmit(sparksubmit。scala:81)在org.apache.spark.deploy.sparksubmit$$anon$2.dosubmit(sparksubmit。scala:920)位于org.apache.spark.deploy.sparksubmit$.main(sparksubmit.com)。scala:929)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)
有谁能帮我解决这个问题,为什么它不能加载。

mctunoxg

mctunoxg1#

你有两个问题:
你提交的申请不正确-两者之间没有逗号 org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 以及 org.apache.spark:spark-cassandra-connector_2.11:2.4.0 ,所以 spark-submit 将cassandra连接器视为jar,而不是使用python文件。
当前版本的spark cassandra connector不支持直接写入spark结构化流数据-此功能仅在dse analytics中可用。但是你可以通过使用 foreachBatch ,类似于这样的内容(未测试,可在此处获得工作的scala代码):

def foreach_batch_function(df, epoch_id):
    df.format("org.apache.spark.sql.cassandra").option("keyspace","test")\
       .option("table", "my_tables").mode('append').save()

query.writeStream.foreachBatch(foreach_batch_function).start()

相关问题