Pyspark:根据某个条件,使用另一行的列中的值来更新对象框架列值,其中这些行具有相似的列值

1cklez4t  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(89)

我需要用另一行中的值更新一行中的列值。只有在发生特定条件时才能执行此操作。通过匹配具有相同值的列,可以找到另一行。
考虑这些数据:
| ID|姓氏|名字|SUBS_NO|账单编号|地址|市|状态|ZIP|关系|
| --|--|--|--|--|--|--|--|--|--|
| 1 |史密斯|吉姆| 123 | 123 |123 Main ST|某处|Wi| 12345 | 1 |
| 2 |DOE|莎莉| 456 | 456 |456 ELM AVE|任何地方|AZ| 54321 | 1 |
| 3 |史密斯|简| 123 | 789 |888 3RD ST|某处|Wi| 12345 | 2 |
条件:每当我们遇到包含RELATION <>“1”的行时,我们需要从具有当前行相同SUBS_NO的行中获取列值。在这种情况下,我们将替换LAST_NAME,FIRST_NAME,BILL_NO和ADDRESS。
最终结果应该是:
| ID|姓氏|名字|SUBS_NO|账单编号|地址|市|状态|ZIP|关系|
| --|--|--|--|--|--|--|--|--|--|
| 1 |史密斯|吉姆| 123 | 123 |123 Main ST|某处|Wi| 12345 | 1 |
| 2 |DOE|莎莉| 456 | 456 |456 ELM AVE|任何地方|AZ| 54321 | 1 |
| 3 |史密斯|吉姆| 123 | 123 |123 Main ST|某处|Wi| 12345 | 2 |
我该如何在Pyspark中实现这一点?谢谢!

cld4siwp

cld4siwp1#

试试下面的代码:

from pyspark.sql.functions import *
from pyspark.sql import Window
w0=Window.partitionBy(col("SUBS_NO")).orderBy(col("RELATION"))
df_new=df.withColumn("FIRST_NAME",when(col("RELATION")!=1,lag(col("FIRST_NAME")).over(w0)).otherwise(col("FIRST_NAME")))
df_new=df_new.withColumn("LAST_NAME",when(col("RELATION")!=1,lag(col("LAST_NAME")).over(w0)).otherwise(col("LAST_NAME")))
df_new=df_new.withColumn("ADDRESS",when(col("RELATION")!=1,lag(col("ADDRESS")).over(w0)).otherwise(col("ADDRESS")))
df_new.show()

字符串

相关问题