java.lang.assertionerror:流上连接时Assert失败

snz8szmq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(446)

我有这个密码

val counter = event_stream
    .withWatermark("timestamp", "5 minutes")
    .groupBy(
      window($"timestamp", "10 minutes", "5 minutes"),
      $"value")
    .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
    .drop("window")
    .withColumnRenamed("value","join_id")

    counter.printSchema

  val counter1 = event_stream
    .groupBy("value")
    .count()
//    .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
    .withColumnRenamed("value","join_id")

  counter1.printSchema()

  val result_stream = event_stream.join(counter,$"value" === $"join_id")
    .drop("key")
    .drop("value")
    .drop("partition")
    .drop("timestamp")
    .drop("join_id")
    .drop("timestampType")
    .drop("offset")
//    .withColumnRenamed("count(value)", "kafka.count")
    .withColumnRenamed("topic","kafka.topic")

  result_stream.printSchema()

  KafkaSink.write(counter, topic_produce)
//  KafkaSink.writeToConsole(result_stream, topic_produce)

如果我把它发送到我使用outputmode.complete的控制台,它可以正常工作,但是当我使用outputmode.append时。当发送上述不同的流式查询时,它会给出不同的错误。
这是我的写函数

private def writeStream(df:DataFrame, topic:String): StreamingQuery = {
    df
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KafkaUtils.kafkaServers)
      .option("topic", topic)
      .option("checkpointLocation", KafkaUtils.checkPointDir)
      .outputMode(OutputMode.Append())
      .start()
  }

我得到这个错误

java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1
{"path":"file:///home/ukaleem/Documents/freenet/Proto2/src/main/resource/events-identification-carrier-a.txt","timestamp":1530198790000,"batchId":0}

为什么会出现这个错误?
第二部分:如果我从上面的代码做

val result_stream = event_stream.join(counter,$"value" === $"join_id")
 KafkaSink.write(result_stream, topic_produce)

我得到这个错误

java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: assertion failed

这两个案子对我都适用。但我在这两方面都犯了错误。
编辑:我解决了第一部分。但我还是需要第二个。

ergxz8rk

ergxz8rk1#

Assert由于检查点而失败,因为对于每个主题都创建了一个偏移量,所以当writestream编写新主题时,它会尝试在偏移量上Assert,但在检查点内找不到任何偏移量,所以Assert错误。试试这个:

.option("failOnDataLoss", false) //whether to fail the query when it's possible that data is lost

相关问题