使用Spark Scala检查一个嵌套框列中的值是否存在于另一个嵌套框列中

uelo1irk  于 8个月前  发布在  Scala
关注(0)|答案(1)|浏览(97)

我有两个数组df1和df2,
df1的列名称具有值,如a、B、c等df2的列ID具有值,如a、B
如果df1中的Name列与df2中的Id列匹配,则我们需要将匹配状态设置为0。如果没有匹配,那么我们需要将匹配状态设置为1。我知道我可以使用collect将df2 ID列放入集合中,然后检查df1中的Name列是否有匹配的条目。

val df1 = Seq(“Rey”, “John”).toDF(“Name”)
val df2 = Seq(“Rey”).toDF(“Id”)

val collect = df2.select("Id").map(r => r.getString(0)).collect.toList

就像,

val df3 = 
    df1.withColumn("match_sts",when(df1("Name").isin(collect).then(0).else(1)))

Expected output
+ — — + — -+
|Name|match_sts|
+ — — + — -+
| Rey| 0  |
|John| 1  |
+ — — + — -+

但我不想在这里用对方付费。是否有其他可行的方法。

pvabu6sv

pvabu6sv1#

使用collect并不是你想要的,但这是DF col --> list转换的一个众所周知的问题.如果不是一个巨大的列表,那么你可以这样做-这实际上是可行的,你也可以广播inlist:

import org.apache.spark.sql.functions._

val df1 = Seq("Rey", "John", "Donald", "Trump").toDF("Name")
val df2 = Seq("Rey", "Donald").toDF("Id")

val inlist = df2.select("Id").map(r => r.getString(0)).collect.toList

val df3 = df1.withColumn("match_status", when(df1("Name").isin(inlist: _*),1).otherwise(0))
df3.show(false)

即使在使用文件中的停用词过滤输出的经典示例中,它们也是这样做的:

val stopWords = stopWordsInput.flatMap(x => x.split(" ")).map(_.trim).collect.toSet

如果太大就向工人们广播但我不知道10万是什么!!!
另一种方法是使用Spark SQL,在使用EXISTS时依靠Catalyst优化SQL:

import spark.implicits._ 
import org.apache.spark.sql.functions._

val df1 = Seq("Rey", "John", "Donald", "Trump").toDF("Name")
val df2 = Seq("Rey", "Donald").toDF("Id") // This can be read from file and split etc.

// Optimizer converts to better physical plan for performance in general
df1.createOrReplaceTempView("searchlist") 
df2.createOrReplaceTempView("inlist")    
val df3 = spark.sql("""SELECT Name, 1 
                     FROM searchlist A
                    WHERE EXISTS (select B.Id from inlist B WHERE B.Id = A.Name )
                                   UNION
                   SELECT Name, 0 
                     FROM searchlist A
                    WHERE NOT EXISTS (select B.Id from inlist B WHERE B.Id = A.Name )
                """)
df3.show(false)

相关问题