pyspark代码优化-以更好的方式处理它

xn1cxnb4  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(349)

我有两个pysparkDataframe,分别是'a'和'b'。我从dataframe'a'中直接选择了几个字段,而对于其他字段,我正在检查条件(如果'a'的字段为空,那么选择dataframe'b'字段)。下面的代码是绝对工作良好,我得到了要求的结果。

df_final = df1.alias('a').join(df2.alias('b'), on=['name_id_forwarded'], how='left')\
    .select(
        'a.name_id','a.SUM','a.full_name',
        f.when(f.isnull(f.col('a.first_name')),f.col('b.first_name')).otherwise(f.col('a.first_name')).alias('first_name'),
        f.when(f.isnull(f.col('a.last_name')),f.col('b.last_name')).otherwise(f.col('a.last_name')).alias('last_name'),
        f.when(f.isnull(f.col('a.email')),f.col('b.email')).otherwise(f.col('a.email')).alias('email'),
        f.when(f.isnull(f.col('a.phone_number')),f.col('b.phone_number')).otherwise(f.col('a.phone_number')).alias('phone_number'),
        f.when(f.isnull(f.col('a.address')),f.col('b.address')).otherwise(f.col('a.address')).alias('address'),
        f.when(f.isnull(f.col('a.address_2')),f.col('b.address_2')).otherwise(f.col('a.address_2')).alias('address_2'),
        f.when(f.isnull(f.col('a.city')),f.col('b.city')).otherwise(f.col('a.city')).alias('city'),
 f.when(f.isnull(f.col('a.email_alt')),f.col('b.email_alt')).otherwise(f.col('a.email_alt')).alias('email_alt'),
       'a.updated','a.date','a.client_reference_code','a.reservation_status',\
       'a.total_cancellations','a.total_covers','a.total_noshows','a.total_spend',\
       'a.total_spend_per_cover','a.total_spend_per_visit','a.total_visits','a.id')

我想知道如果字段的数量随着时间的推移而增加,那么我将如何使用loop来处理这些代码,以便使其自动化。
我尝试了下面的代码,但得到错误,有人能帮忙吗?
col\u list=[所有必填字段]

df_final = df1.alias('a').join(df2.alias('b'), on=['name_id_forwarded'], how='left')\
    .select('a.name_id','a.SUM','a.full_name',\

for x in col_list:
    f.when(f.isnull(f.col('a.x')),f.col('b.x')).otherwise(f.col('a.x')).alias('x'),
)

我想在选择我不能使用循环,请建议我其他方式。强文本

uelo1irk

uelo1irk1#

在列表中添加所需的列或列表达式,然后将该列表传递给 select .
检查以下代码。

col_list = [all required fields]

使用 when 功能

colExpr = ['a.name_id','a.SUM','a.full_name'] + list(map(lambda x: f.when(f.isnull(f.col('a.x')),f.col('b.x')).otherwise(f.col('a.x')).alias('x'),col_list))
df_final = df1.alias('a').join(df2.alias('b'), on=['name_id_forwarded'], how='left').select(*colExpr) # select

使用 nvl 功能

colExpr = ['a.name_id','a.SUM','a.full_name'] + list(map(lambda x: "nvl(a.{},b.{}) as {}".format(x,x,x),col_list))
df_final = df1.alias('a').join(df2.alias('b'), on=['name_id_forwarded'], how='left').selectExpr(*colExpr) # selectExpr

相关问题