scala结构的外部联接的行为类似于内部联接

kmynzznz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(295)

我正在尝试结构化spark流连接,我左侧的外部连接和内部连接的行为完全相同。
使用spark版本2.4.2和scala版本2.12.8,eclipse openj9 vm,1.8.0ü
这就是我想做的,
创建每秒生成1行的速率流。
从中创建员工和部门流。
employee stream deptid字段将rate值乘以2,将dept stream id字段乘以3
这样做的目的是有两个流,其中有几个共同的和不共同的id字段。
执行leftouter流连接,时间限制为30秒,dept流位于连接的左侧。
期望:在30秒的时间限制之后,对于不匹配的行,我应该在join的右侧看到null。
发生了什么事
我只看到ID之间匹配的行,而不是不匹配的行。
密码-试Spark壳

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

case class RateData(timestamp: Timestamp, value: Long)

// create rate source with 1 row per second.
val rateSource = spark.readStream.format("rate").option("rowsPerSecond", 1).option("numPartitions", 1).option("rampUpTime", 1).load()

import spark.implicits._
val rateSourceData = rateSource.as[RateData]

// employee stream departid increments by 2
val employeeStreamDS = rateSourceData.withColumn("firstName",  concat(lit("firstName"),rateSourceData.col("value")*2)).withColumn("departmentId", lit(floor(rateSourceData.col("value")*2))).withColumnRenamed("timestamp", "empTimestamp").withWatermark("empTimestamp", "10 seconds")

// dept stream id increments by 3
val departmentStreamDS = rateSourceData.withColumn("name", concat(lit("name"),floor(rateSourceData.col("value")*3))).withColumn("Id", lit(floor(rateSourceData.col("value")*3))).drop("value").withColumnRenamed("timestamp", "depTimestamp")

// watermark - 10s and time constraint is 30 secs on employee stream.
val joinedDS  =  departmentStreamDS.join(employeeStreamDS, expr(""" id = departmentId AND empTimestamp >= depTimestamp AND empTimestamp <= depTimestamp + interval 30 seconds """), "leftOuter")

val q = joinedDS.writeStream.format("parquet").trigger(Trigger.ProcessingTime("60 seconds")).option("checkpointLocation", "checkpoint").option("path", "rate-output").start

我在10分钟后查询了表的输出,只找到了31个匹配的行。与内部连接输出相同。

val df = spark.read.parquet("rate-output")
 df.count
res0: Long = 31
df.agg(min("departmentId"), max("departmentId")).show
+-----------------+-----------------+
|min(departmentId)|max(departmentId)|
+-----------------+-----------------+
|                0|              180|
+-----------------+-----------------+

输出说明。employeestreamds stream,departmentid字段值是rate值的2倍,因此是2的倍数。
departmentstreamds stream,id字段是rate stream值的3倍,所以是3的倍数。
所以每6个都有departmentid=id的匹配,因为lcm(2,3)=6。直到这些流之间存在30秒的差异(连接时间限制)时才会发生这种情况。
我希望在30秒后,dept stream的值为空(3,9,15..)等等。
我希望我解释得足够好。
因此,结果是关于Spark流的左外连接行为的问题。

3zwtqj6y

3zwtqj6y1#

根据我的理解https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-流连接时,需要在两个流的事件时间列上应用水印,例如:

val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
...
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
 )

你只有一个 watermark 定义。

相关问题