scala—在没有水印的流Dataframe/数据集上存在流聚合时,不支持追加输出模式;;

fnx2tebb  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(330)

在spark structured streaming 2.4.6中,当我尝试查询此查询的输出Dataframe时,出现上述错误:

pageviewsDF
        .join(usersDF,
          expr(
            s"""
              |pv.userid = u.userid AND
              |pv.${eventTimePV} >= u.${eventTimeU} AND
              |pv.${eventTimePV} <= u.${eventTimeU} + interval ${leftDFDelay} seconds
              |""".stripMargin)
        )
        .select(
          col("pv.userId").as("userid"),
          col("u.gender").as("gender"),
          col("pv.viewtime").as("viewtime"),
          col("pv.pageid").as("pageid"),
          col(s"pv.${eventTimePV}").as(s"${eventTimePV}")
        )
        .groupBy(
          window(
            col(s"${eventTimePV}"),
            s"${this.windowDuration} seconds",
            s"${this.slideInterval} seconds"
          ),
          col("gender"),
          col("pageid")
        )
        .agg(sum(col("viewtime")).as("total_viewtime"))
        .withColumn("page_pos", row_number().over(windowSpecPageId))
        .where(col("page_pos") <= lit(this.topPagesLimit))

pageviewsdf和usersdf的定义如下:

.withWatermark(watermarkCol, s"${maxDelay} seconds")

在这两个Dataframe中,水印列都是一个名为“timestamp”的列,取自kafka。
为什么引擎仍然抛出我在标题中输入的异常?当每个源都定义了水印,并且聚合有定义了列、区间和滑动区间的窗口时。
全输出:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
[info] Project [gender#64, pageid#66, total_viewtime#80L]
[info] +- Aggregate [window#81, gender#64, pageid#66], [window#81 AS window#73, gender#64, pageid#66, sum(viewtime#65L) AS total_viewtime#80L]
[info]    +- Filter isnotnull(timestamp#67)
[info]       +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#67, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#81, userid#63, gender#64, viewtime#65L, pageid#66, timestamp#67]
[info]          +- Project [userId#10 AS userid#63, gender#28 AS gender#64, viewtime#11L AS viewtime#65L, pageid#12 AS pageid#66, timestamp#9-T20000ms AS timestamp#67]
[info]             +- Join Inner, (((userid#10 = userid#27) && (timestamp#9-T20000ms >= timestamp#26-T20000ms)) && (timestamp#9-T20000ms <= cast(timestamp#26-T20000ms + interval 20 seconds as timestamp)))
[info]                :- SubqueryAlias `pv`
[info]                :  +- EventTimeWatermark timestamp#9: timestamp, interval 20 seconds
[info]                :     +- Project [data#6.userid AS userid#10, data#6.viewtime AS viewtime#11L, data#6.pageid AS pageid#12, cast(from_unixtime(timestamp#2L, yyyy-MM-dd HH:mm:ss, Some(Europe/Berlin)) as timestamp) AS timestamp#9]
[info]                :        +- Project [jsontostructs(StructField(userid,StringType,false), StructField(viewtime,LongType,false), StructField(pageid,StringType,false), cast(value#1 as string), Some(Europe/Berlin)) AS data#6, timestamp#2L]
[info]                :           +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3725ee,json,List(),Some(StructType(StructField(topic,StringType,false), StructField(value,StringType,false), StructField(timestamp,LongType,false))),List(),None,Map(path -> file:///mnt/c/joyn_challenge/joyn-challenge/target/scala-2.12/test-classes/kafka/streaming/pageviews/),None), FileSource[file:///mnt/c/project/target/scala-2.12/test-classes/kafka/streaming/pageviews/], [topic#0, value#1, timestamp#2L]
[info]                +- SubqueryAlias `u`
[info]                   +- EventTimeWatermark timestamp#26: timestamp, interval 20 seconds
[info]                      +- Project [data#23.userid AS userid#27, data#23.gender AS gender#28, data#23.registertime AS registertime#29L, data#23.regionid AS regionid#30, cast(from_unixtime(timestamp#19L, yyyy-MM-dd HH:mm:ss, Some(Europe/Berlin)) as timestamp) AS timestamp#26]
[info]                         +- Project [jsontostructs(StructField(userid,StringType,false), StructField(gender,StringType,false), StructField(registertime,LongType,false), StructField(regionid,StringType,false), cast(value#18 as string), Some(Europe/Berlin)) AS data#23, timestamp#19L]
[info]                            +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3725ee,json,List(),Some(StructType(StructField(topic,StringType,false), StructField(value,StringType,false), StructField(timestamp,LongType,false))),List(),None,Map(path -> file:///mnt/c/joyn_challenge/joyn-challenge/target/scala-2.12/test-classes/kafka/streaming/users/),None), FileSource[file:///mnt/c/project/target/scala-2.12/test-classes/kafka/streaming/users/], [topic#17, value#18, timestamp#19L]

我通过文件模拟Kafka主题,因此这些路径在输出中。
我想另一个问题是,在groupby的window中应该使用哪些水印?因为pageviewsdf有一列设置为水印,而usersdf有另一列。那么选择哪一个呢?
事先谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题