我正在通过spark结构化流读取kafka主题中的日志行,分离日志行的字段,对字段执行一些操作,并将其存储在dataframe中,每个字段都有单独的列。我想把这个Dataframe写给Kafka
下面是我的示例dataframe和writestream,用于将其写入kafka
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
上面的代码给我下面的错误
Required attribute 'value' not found
我相信这是因为我没有键/值格式的Dataframe。如何以最有效的方式将现有的Dataframe写入kafka?
1条答案
按热度按时间gajydyqb1#
正在写入kafka的Dataframe在架构中应具有以下列:
键(可选)(类型:字符串或二进制)
值(必需)(类型:字符串或二进制)
主题(可选)(类型:string)
在你的情况下没有
value
列并引发异常。必须对其进行修改,以至少添加value列,例如:
有关更多详细信息,请查看:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-Kafka的资料