我想把spark结构化流数据写入cassandra。我的spark版本是2.4.0。
我研究了一些post和datasax企业平台的一些用法。我没用过,找到了方法 foreachBatch
这有助于将流数据写入接收器。
我审查了一个基于databricks网站的文档。自己试试。
这是我写的代码:
parsed = parsed_opc \
.withWatermark("sourceTimeStamp", "10 minutes") \
.dropDuplicates(["id", "sourceTimeStamp"]) \
.groupBy(
window(parsed_opc.sourceTimeStamp, "4 seconds"),
parsed_opc.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.withColumnRenamed("window", "sourceTime")
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="opc", keyspace="poc")\
.save()
parsed.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()
的架构 parsed
Dataframe是:
root
|-- sourceTime: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- id: string (nullable = true)
|-- avg: double (nullable = true)
我可以像这样成功地将这个流式df写入控制台:
query = parsed \
.writeStream \
.format("console")\
.outputMode("complete")\
.start()
控制台输出如下:
+--------------------+----+---+
| sourceTime| id|avg|
+--------------------+----+---+
|[2019-07-20 18:55...|Temp|2.0|
+--------------------+----+---+
所以,当写入控制台时,这没关系。但当我在 cqlsh
表中没有附加记录。
这是cassandra中的table create脚本:
CREATE TABLE poc.opc ( id text, avg float,sourceTime timestamp PRIMARY KEY );
那么,你能告诉我怎么了吗?
1条答案
按热度按时间yftpprvb1#
在研究这个问题之后,我找到了解决办法。
仔细查看终端日志,我发现有一个错误日志:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [2019-07-20 18:55:00.0,2019-07-20 18:55:04.0] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to java.util.Date.
这是因为,当你window
在spark中,它向schema的timestamp列(在本例中是)添加一个结构sourceTime
. 的架构sourceTime
看起来像这样:但我已经在Cassandra写了一篇专栏
sourceTime
但它只需要一个时间戳值。如果查找错误,它会尝试发送start
以及end
cassandra表中不存在的timestamp参数。所以,从
parsed
dataframe解决了问题:cassandra_df = parsed.select("sourcetime.start", "avg", "sourcetime.end", "id")
.