PySpark Dataframe聚合中的“when”子句中包含动态条件时出错

iqjalb3h  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(49)

我正在处理一个PySpark Dataframe聚合,我试图根据列列表在'when'子句中动态生成条件。然而,我遇到了一个语法错误。下面是我的代码片段:

from pyspark.sql import functions as F

group_by_columns = ["class1", "class2"]
compare_columns = ["outliers"]

aggregations = [
    F.count(F.when(F.col(f"latest_{compare_columns[0]}") == True, True)).alias("Count_outliers_latest_compare_true"),
    F.count(F.when(F.col(f"previous_{compare_columns[0]}") == True, True)).alias("Count_outliers_previous_compare_true"),
    F.count(F.when(
        (F.col(f"latest_{compare_columns[0]}") == True) &
        (F.col(f"previous_{compare_columns[0]}") == True) &
        *((F.col(f"previous_{col}") == F.col(f"latest_{col}")) for col in group_by_columns),
        True
    )).alias("Count_outliers_both_compare_true_and_group_values_unchanged"),
    F.count('*').alias('Total_count_per_class')
]

字符串
我收到了一个与when子句中使用 * 有关的语法错误。我也尝试使用[ ]而不是(),但我收到了同样的问题。我如何纠正这个错误以避免语法错误?
错误:SyntaxError:invalid syntax(,line 10)文件<->:10 *((F.col(f“previous_{col}”)== F.col(f“latest_{col}”))for col in group_by_columns),^ SyntaxError:invalid syntax
关于列表解析的预期用法的更多说明,

*((F.col(f"previous_{col}") == F.col(f"latest_{col}")) for col in group_by_columns)


在这段代码中,我打算根据group_by_columns中的项目动态添加条件。例如,group_by_columns中只有一个项目,它应该返回:

(F.col(f"previous_group_by_columns[0]") == F.col(f"latest_group_by_columns[0]"))


如果group_by_columns有两个项目:

(F.col(f"previous_group_by_columns[0]") == F.col(f"latest_group_by_columns[0]")) & (F.col(f"previous_group_by_columns[1]") == F.col(f"latest_group_by_columns[1]"))


如果group_by_columns有三个项目:

(F.col(f"previous_group_by_columns[0]") == F.col(f"latest_group_by_columns[0]")) & (F.col(f"previous_group_by_columns[1]") == F.col(f"latest_group_by_columns[1]")) & (F.col(f"previous_group_by_columns[2]") == F.col(f"latest_group_by_columns[2]"))


b4qexyjb

b4qexyjb1#

通过使用reduce,我可以解决:

# Aggregations
aggregations = [
    F.count(F.when(F.col(f"latest_{self.compare_columns[0]}") == True, True)).alias("Count_outliers_latest_compare_true"),
    F.count(F.when(F.col(f"previous_{self.compare_columns[0]}") == True, True)).alias("Count_outliers_previous_compare_true"),
    F.count(F.when(
        (F.col(f"latest_{self.compare_columns[0]}") == True) &
        (F.col(f"previous_{self.compare_columns[0]}") == True) &
        reduce(lambda acc, col: acc & (F.col(f"previous_{col}") == F.col(f"latest_{col}")), self.group_by_columns, F.lit(True)),
        True
    )).alias("Count_outliers_both_compare_true_and_group_values_unchanged"),
    F.count('*').alias('Total_count_per_class')
]

字符串

相关问题