pyspark-join with或condition

eqqqjvef  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(278)

如果两个条件中至少有一个满足,我想加入两个pysparkDataframe。
玩具数据:

df1 = spark.createDataFrame([
    (10, 1, 666),
    (20, 2, 777),
    (30, 1, 888),
    (40, 3, 999),
    (50, 1, 111),
    (60, 2, 222),
    (10, 4, 333),
    (50, None, 444),
    (10, 0, 555),
    (50, 0, 666)
    ],
    ['var1', 'var2', 'other_var'] 
)

df2 = spark.createDataFrame([
    (10, 1),
    (20, 2),
    (30, None),
    (30, 0)
    ],
    ['var1_', 'var2_'] 
)

我想保持所有这些行的 df1 哪里 var1 存在于 df2.var1_var2 存在于 df2.var2_ (但如果该值为0,则不适用)。
所以,预期的产出是

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|   # join on both var1 and var2
|  20|   2|      777|   20|    2|   # join on both var1 and var2
|  30|   1|      888|   10|    1|   # join on both var1 and var2
|  50|   1|      111|   10|    1|   # join on var2
|  60|   2|      222|   20|    2|   # join on var2
|  10|   4|      333|   10|    1|   # join on var1
|  10|   0|      555|   10|    1|   # join on var1
+----+----+---------+-----+-----+

在其他的尝试中,我尝试了

cond = [(df1.var1 == (df2.select('var1_').distinct()).var1_) | (df1.var2 == (df2.filter(F.col('var2_') != 0).select('var2_').distinct()).var2_)]
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  20|   2|      777|   20|    2|
|  30|   1|      888|   10|    1|
|  50|   1|      111|   10|    1|
|  30|   1|      888|   30| null|
|  30|   1|      888|   30|    0|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
|  10|   0|      555|   30|    0|
|  50|   0|      666|   30|    0|
+----+----+---------+-----+-----+

但是我得到了比预期更多的行 var2 == 0 它们也被保存了下来。
我做错什么了?
注意:我没有使用 .isin 因为我的实际 df2 有大约20k行,我在这里读到,这个方法有大量的id,性能可能会很差。

ezykj2lf

ezykj2lf1#

尝试以下条件:

cond = (df2.var2_ != 0) & ((df1.var1 == df2.var1_) | (df1.var2 == df2.var2_))
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  30|   1|      888|   10|    1|
|  20|   2|      777|   20|    2|
|  50|   1|      111|   10|    1|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
+----+----+---------+-----+-----+

条件应仅包括要联接的两个Dataframe中的列。如果你想移除 var2_ = 0 ,可以将它们作为联接条件,而不是筛选器。
也不需要具体说明 distinct ,因为它不影响相等条件,而且还添加了不必要的步骤。

相关问题