使用spark datasource api v2 hive streaming sink提交的偏移量无序

of1yzvn4  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(295)

我正在使用sink将spark(2.3)结构化流式Dataframe保存到具有自定义sink实现的配置单元表中。
代码如下。

val df = spark.readStream.format("socket").option("host", "localhost").option("port", 19191).load().as[String]

val query = df.map { s => val records = s.split(",") assert(records.length >= 4)
        (records(0).toInt, records(1), records(2), records(3))
     }

query.selectExpr("_1 as eid", "_2 as name", "_3 as salary", "_4 as designation").
      writeStream.
      format("hive-streaming").
      option("metastore", ".....").
      option("db", "test").
      option("table", "test_employee").
      option("checkpointLocation", "/checkpoints/employee/checkpoint").
      queryName("socket-hive-streaming").
      start()

这将导致以下运行时错误。

ERROR streaming.MicroBatchExecution: Query socket-hive-streaming [id =  ......, runId = ......] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 1 followed by 0
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:146)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:356)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:355)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:355)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2fjabf4q

2fjabf4q1#

有两种方法可以解决您的问题:
删除/清除检查点: /checkpoints/employee/checkpoint 在你的机器上
使用另一个保持偏移的源,比如Kafka
遇到此问题的原因是套接字不维护偏移量信息。
重新启动从接收输入数据的作业时 socket 9999 ,您的工作要做的第一件事是尝试从 /checkpoints/employee/checkpoint ,它发现已记录的当前偏移量为 1 . 然后输入其他消息到 socket 9999 ,您的作业发现 socket 99990 . 所以它抛出了这个异常。

相关问题