尝试使用spark结构化流媒体将数据写入kafka主题,出现以下错误。
aggregatedDataset
.select(to_json(struct("*")).as("value"))
.writeStream()
.outputMode(OutputMode.Append())
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("topic", topic)
.option("checkpointLocation", checkpointLocation)
.start();
堆栈跟踪:
Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$11.apply(DataSource.scala:276)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$11.apply(DataSource.scala:276)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:275)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
1条答案
按热度按时间rslzwgfq1#
在你的writestream部分中,格式丢失了,在你的例子中似乎是Kafka,
希望这有帮助!