错误:join中缺少已解析的属性

sz81bmfz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(333)

我用Pypark来表演 join 具有相对复杂联接条件(在联接条件中使用大于/小于)的两个表。这工作正常,但一旦我添加一个 fillna 连接前的命令。
代码如下所示:

join_cond = [
    df_a.col1 == df_b.colx,
    df_a.col2 == df_b.coly,
    df_a.col3 >= df_b.colz
]

df = (
    df_a
    .fillna('NA', subset=['col1'])
    .join(df_b, join_cond, 'left')
)

这会导致如下错误:
org.apache.spark.sql.analysisexception:运算符中的col1#6488、col2#4766、col3#4768、colx#4823、coly#4830、colz#4764缺少解析属性col1#4765!连接leftouter,((col1#4765=colx#4823)和&(col2#4766=coly#4830)和&(col3#4768>=colz#4764))。具有相同名称的属性出现在操作:col1中。请检查是否使用了正确的属性。
看来spark已经认不出来了 col1 在执行 fillna . (如果我把它注解掉,错误就不会出现。)问题是我确实需要那句话(总的来说,我把这个例子简化了很多。)
我已经看了这个问题,但这些答案对我不适用。具体来说,使用 .alias('a') 之后 fillna 因为spark无法识别 a 在连接条件中。
有人能:
具体解释为什么会发生这种情况,以及我今后如何避免这种情况?
告诉我解决问题的方法?
事先谢谢你的帮助。

cs7cruho

cs7cruho1#

发生了什么事?

为了“替换”空值,将创建一个包含新列的新Dataframe。这些新列的名称与旧列相同,但实际上是全新的spark对象。在scala代码中,您可以看到“changed”列是新创建的列,而原始列被删除。
查看此效果的一种方法是在替换空值之前和之后对Dataframe调用explain:

df_a.explain()

印刷品

== Physical Plan ==

* (1) Project [_1#0L AS col1#6L, _2#1L AS col2#7L, _3#2L AS col3#8L]

+- *(1) Scan ExistingRDD[_1#0L,_2#1L,_3#2L]

虽然

df_a.fillna(42, subset=['col1']).explain()

印刷品

== Physical Plan ==

* (1) Project [coalesce(_1#0L, 42) AS col1#27L, _2#1L AS col2#7L, _3#2L AS col3#8L]

+- *(1) Scan ExistingRDD[_1#0L,_2#1L,_3#2L]

两个计划都包含一个名为 col1 ,但在第一种情况下,内部表示称为 col1#6L 而第二个叫做 col1#27L .
当连接条件 df_a.col1 == df_b.colx 现在与列关联 col1#6L 如果只有列 col1#27L 是左表的一部分。

这个问题怎么解决?

显而易见的方法是将“fillna”操作移到连接条件的定义之前:

df_a = df_a.fillna('NA', subset=['col1'])
join_cond = [
    df_a.col1 == df_b.colx,
[...]

如果这是不可能的或想要你可以改变连接条件。而不是使用Dataframe中的列( df_a.col1 )通过使用col函数,可以使用与任何Dataframe都不关联的列。此列仅基于其名称工作,因此在Dataframe中替换该列时忽略:

from pyspark.sql import functions as F
join_cond = [
    F.col("col1") == df_b.colx,
    df_a.col2 == df_b.coly,
    df_a.col3 >= df_b.colz
]

第二种方法的缺点是两个表中的列名必须是唯一的。

相关问题