如何使用pyspark将结构化流数据写入cassandra?

q35jwt9p  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(376)

我想把spark结构化流数据写入cassandra。我的spark版本是2.4.0。
我研究了一些post和datasax企业平台的一些用法。我没用过,找到了方法 foreachBatch 这有助于将流数据写入接收器。
我审查了一个基于databricks网站的文档。自己试试。
这是我写的代码:

parsed = parsed_opc \
    .withWatermark("sourceTimeStamp", "10 minutes") \
    .dropDuplicates(["id", "sourceTimeStamp"]) \
    .groupBy(
        window(parsed_opc.sourceTimeStamp, "4 seconds"),
        parsed_opc.id
    ) \
    .agg({"value": "avg"}) \
    .withColumnRenamed("avg(value)", "avg")\
    .withColumnRenamed("window", "sourceTime") 

def writeToCassandra(writeDF, epochId):
  writeDF.write \
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="opc", keyspace="poc")\
    .save()

parsed.writeStream \
    .foreachBatch(writeToCassandra) \
    .outputMode("update") \
    .start()

的架构 parsed Dataframe是:

root
 |-- sourceTime: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- avg: double (nullable = true)

我可以像这样成功地将这个流式df写入控制台:

query = parsed \
  .writeStream \
  .format("console")\
  .outputMode("complete")\
  .start()

控制台输出如下:

+--------------------+----+---+
|          sourceTime|  id|avg|
+--------------------+----+---+
|[2019-07-20 18:55...|Temp|2.0|
+--------------------+----+---+

所以,当写入控制台时,这没关系。但当我在 cqlsh 表中没有附加记录。
这是cassandra中的table create脚本:

CREATE TABLE poc.opc ( id text, avg float,sourceTime timestamp PRIMARY KEY );

那么,你能告诉我怎么了吗?

yftpprvb

yftpprvb1#

在研究这个问题之后,我找到了解决办法。
仔细查看终端日志,我发现有一个错误日志: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [2019-07-20 18:55:00.0,2019-07-20 18:55:04.0] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to java.util.Date. 这是因为,当你 window 在spark中,它向schema的timestamp列(在本例中是)添加一个结构 sourceTime . 的架构 sourceTime 看起来像这样:

sourceTime: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)

但我已经在Cassandra写了一篇专栏 sourceTime 但它只需要一个时间戳值。如果查找错误,它会尝试发送 start 以及 end cassandra表中不存在的timestamp参数。
所以,从 parsed dataframe解决了问题: cassandra_df = parsed.select("sourcetime.start", "avg", "sourcetime.end", "id") .

相关问题