我尝试使用spark streaming(2.2.0)和kafka输入以及这样的配置:
水印与聚合
开窗
输出模式:outputmode.append()
触发模式:trigger.once()
但是我没有输出(Kafka和控制台都没有)。多次启动应用程序也不起作用。
当我使用outputmode.complete()或trigger.processingtime(0l)时,它可以工作。但不幸的是,这不是我的需要。
一次触发器是否支持追加模式?
这里有一个最小的应用程序来重现这个问题。主旨
case class Model(valueForGroupBy: Int, time: Timestamp)
object Application {
val appName = "sample"
// val outputMode: OutputMode = OutputMode.Complete() // OK
val outputMode: OutputMode = OutputMode.Append() // KO with trigger once
val triggerMode: Trigger = Trigger.Once()
val delayThreshold: FiniteDuration = 1.minute // watermarking wait for late
val duration : FiniteDuration = 1.minute // window duration and slide
val topic = "SAMPLE_TOPIC"
val bootstrapServers = "127.0.0.1:9092"
type KafkaKV = (String, String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.appName(appName)
.getOrCreate()
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._
val streamReader: DataStreamReader = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val df: DataFrame = streamReader.load()
val ds: Dataset[Model] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaKV]
.select(from_json($"value", Encoders.product[Model].schema).as("json"))
.select($"json.*")
.as[Model]
val groupByColumns = Seq(
window(new Column("time"), windowDuration = duration.toString, slideDuration = duration.toString),
new Column("valueForGroupBy")
)
val agg = ds
.withWatermark("time", delayThreshold.toString)
.groupBy(groupByColumns: _*)
.count()
val streamWriter = agg
.selectExpr(s"CAST(valueForGroupBy AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.trigger(triggerMode)
.outputMode(outputMode)
.format("console")
.option("truncate", value = false)
val streamingQuery = streamWriter.start()
streamingQuery.awaitTermination()
}
}
2条答案
按热度按时间ndasle7k1#
这是已知的Spark虫。
在实际实现中,水印将被保存在下一批中。由于触发器中没有后续批处理,所以watermak永远不会持久化。
spark bugtracker中存在票证:https://issues.apache.org/jira/browse/spark-24699
vojdkbi02#
不支持带触发器一次的追加模式。
这是一个已知的错误:https://issues.apache.org/jira/browse/spark-24699