带字符串值的pyspark cumcount

sz81bmfz  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(428)

我的输入Dataframe是;

ID   Amount   result
45   1        default
46   2        default
47   3        default
48   5        default
48   10       default
49   12       default
50   5        default
50   7        default
50   9        default

期望输出为;

ID   Amount   result
45   1        default1
46   2        default1
47   3        default1
48   5        default1
48   10       default2
49   12       default1
50   5        default1
50   7        default2
50   9        default3

如果id没有改变,默认字符串应该像“default1”,“default2”这样增加。首先是应该始终为“default1”。
对于python,我可以用下面的代码来实现这一点;

df['result'] += df.groupby('ID').cumcount().add(1).astype(str)

你能帮我查一下pyspark代码吗?

q1qsirdb

q1qsirdb1#

是的,您可以使用spark窗口功能。
ps:如果您的Dataframe中没有可以作为排序依据的索引的列,请使用 "ID" 列本身。我用过 "index" 用于示例目的的列。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, lit

df = spark.createDataFrame(
    [
        (1, 45, 1, 'default'),
        (2, 46, 2, 'default'),
        (3, 47, 3, 'default'),
        (4, 48, 5, 'default'),
        (5, 48, 10, 'default'),
        (6, 49, 12, 'default'),
        (7, 50, 5, 'default'),
        (8, 50, 7, 'default'),
        (9, 50, 9, 'default')
    ],
    ['index', 'ID', 'Amount', 'result'] # add your columns label here
)

df = df.withColumn("occurence", row_number().over(Window.partitionBy("ID").orderBy("index")))
df = df.withColumn("result", concat(col("result"), col("occurence"))).drop("occurence")
df.orderBy("index").show()

结果:

+-----+---+------+--------+
|index| ID|Amount|  result|
+-----+---+------+--------+
|    1| 45|     1|default1|
|    2| 46|     2|default1|
|    3| 47|     3|default1|
|    4| 48|     5|default1|
|    5| 48|    10|default2|
|    6| 49|    12|default1|
|    7| 50|     5|default1|
|    8| 50|     7|default2|
|    9| 50|     9|default3|
+-----+---+------+--------+

相关问题