我有这个密码
val counter = event_stream
.withWatermark("timestamp", "5 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"value")
.agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
.drop("window")
.withColumnRenamed("value","join_id")
counter.printSchema
val counter1 = event_stream
.groupBy("value")
.count()
// .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
.withColumnRenamed("value","join_id")
counter1.printSchema()
val result_stream = event_stream.join(counter,$"value" === $"join_id")
.drop("key")
.drop("value")
.drop("partition")
.drop("timestamp")
.drop("join_id")
.drop("timestampType")
.drop("offset")
// .withColumnRenamed("count(value)", "kafka.count")
.withColumnRenamed("topic","kafka.topic")
result_stream.printSchema()
KafkaSink.write(counter, topic_produce)
// KafkaSink.writeToConsole(result_stream, topic_produce)
如果我把它发送到我使用outputmode.complete的控制台,它可以正常工作,但是当我使用outputmode.append时。当发送上述不同的流式查询时,它会给出不同的错误。
这是我的写函数
private def writeStream(df:DataFrame, topic:String): StreamingQuery = {
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaUtils.kafkaServers)
.option("topic", topic)
.option("checkpointLocation", KafkaUtils.checkPointDir)
.outputMode(OutputMode.Append())
.start()
}
我得到这个错误
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1
{"path":"file:///home/ukaleem/Documents/freenet/Proto2/src/main/resource/events-identification-carrier-a.txt","timestamp":1530198790000,"batchId":0}
为什么会出现这个错误?
第二部分:如果我从上面的代码做
val result_stream = event_stream.join(counter,$"value" === $"join_id")
KafkaSink.write(result_stream, topic_produce)
我得到这个错误
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: assertion failed
这两个案子对我都适用。但我在这两方面都犯了错误。
编辑:我解决了第一部分。但我还是需要第二个。
1条答案
按热度按时间ergxz8rk1#
Assert由于检查点而失败,因为对于每个主题都创建了一个偏移量,所以当writestream编写新主题时,它会尝试在偏移量上Assert,但在检查点内找不到任何偏移量,所以Assert错误。试试这个: