spark流附加输出模式

1u4esq0p  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(302)

我尝试使用spark streaming(2.2.0)和kafka输入以及这样的配置:
水印与聚合
开窗
输出模式:outputmode.append()
触发模式:trigger.once()
但是我没有输出(Kafka和控制台都没有)。多次启动应用程序也不起作用。
当我使用outputmode.complete()或trigger.processingtime(0l)时,它可以工作。但不幸的是,这不是我的需要。
一次触发器是否支持追加模式?
这里有一个最小的应用程序来重现这个问题。主旨

case class Model(valueForGroupBy: Int, time: Timestamp)

object Application {

  val appName = "sample"

//  val outputMode: OutputMode = OutputMode.Complete() // OK
  val outputMode: OutputMode = OutputMode.Append() // KO with trigger once

  val triggerMode: Trigger = Trigger.Once()

  val delayThreshold: FiniteDuration = 1.minute // watermarking wait for late

  val duration : FiniteDuration = 1.minute // window duration and slide

  val topic = "SAMPLE_TOPIC"
  val bootstrapServers = "127.0.0.1:9092"

  type KafkaKV = (String, String)

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName(appName)
      .getOrCreate()

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import spark.implicits._

    val streamReader: DataStreamReader = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "false")

    val df: DataFrame = streamReader.load()

    val ds: Dataset[Model] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaKV]
      .select(from_json($"value", Encoders.product[Model].schema).as("json"))
      .select($"json.*")
      .as[Model]

    val groupByColumns = Seq(
      window(new Column("time"), windowDuration = duration.toString, slideDuration = duration.toString),
      new Column("valueForGroupBy")
    )

    val agg = ds
      .withWatermark("time", delayThreshold.toString)
      .groupBy(groupByColumns: _*)
      .count()

    val streamWriter = agg
      .selectExpr(s"CAST(valueForGroupBy AS STRING) AS key", "to_json(struct(*)) AS value")
      .writeStream
      .trigger(triggerMode)
      .outputMode(outputMode)
      .format("console")
      .option("truncate", value = false)

    val streamingQuery = streamWriter.start()

    streamingQuery.awaitTermination()

  }
}
ndasle7k

ndasle7k1#

这是已知的Spark虫。
在实际实现中,水印将被保存在下一批中。由于触发器中没有后续批处理,所以watermak永远不会持久化。
spark bugtracker中存在票证:https://issues.apache.org/jira/browse/spark-24699

vojdkbi0

vojdkbi02#

不支持带触发器一次的追加模式。
这是一个已知的错误:https://issues.apache.org/jira/browse/spark-24699

相关问题