scala检查连接流是否成功

uujelgoq  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(281)

我刚接触apachespark,使用scala。我可以使用以下命令将表连接到流:

Updated_DF = Inbound_DF.join(colToAdd, colToAdd("key") <=> Inbound_DF("key"), "left")
    .withColumnRenamed("Data_DF","site").drop("Id","key")

现在我想看看 colToAdd("key") 以及 Inbound_DF("key") 匹配并加入是否成功。例如,coltoadd:

Id   key     Data_DF
S31  S3   {"name":"nick","region":"IN"}
S21  S2   {"name":"john","region":"CA"}
S11  S1   {"name":"ashley","region":"CA"}
S51  S5   {"name":"bella","region":"UK"}
S41  S4   {"name":"kumar","region":"In"}
S6   S6   {"name":"ben","region":"US"}
P11  P1   {"name":"MKD","region":"UAE"}
P21  P2   {"name":"ahmad","region":"UAE"}

来自传入流的消息如下所示:

cusId  key  item  price
1897   S2   book   54

加入后,更新的消息应如下所示:

cusId  key  item  price    site
1897   S2   book   54    {"name":"john","region":"CA"}

但是如果我收到一条流消息 key = S9 ,连接将不会发生,然后我要记录一条消息:

------- join failed, key not found ---------

据我所知,这可以通过使用 filter 方法,但我不知道如何实现。请帮助我如何做到这一点,或者有没有更好的方法来做同样的事情。

xkrw2x1b

xkrw2x1b1#

有多种方法可以做到这一点。我只是给你提供了一个如何做到这一点的想法,你可以根据你的用例进行调整。
首先,左连接的方式不正确,需要交换Dataframe。流dataframe应保留为dataframe。

//Source data
val df = Seq(("S31","S3","""{"name":"nick","region":"IN"}"""),("S21","S2","""{"name":"john","region":"CA"}"""),("S11","S1","""{"name":"john","region":"CA"}""")).toDF("Id","Key","Data_DF")
val df1 = Seq((1897,"S2","book",54),(1920,"S9","movie",200)).toDF("custId","Key","item","price")
//initial join and the count of the records
val df2 = df1.join(df,Seq("Key"),"left").drop("Id").withColumnRenamed("Data_DF","site")
val initialjoincount = df2.count()
//filter and count of the records
val filteredDF = df2.filter($"site".isNotNull)
val filtereddfcount = filteredDF.count()
//compare both the counts and print message/log
if(filtereddfcount == initialjoincount)
{
  println("Join Happened")
}
else
{
  println("Value not found in stream.")
}

相关问题