在pyspark中使用条件序列进行未编码

holgip5t  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(282)

我需要在dataframe pyspark中用条件序号对一列进行反编码。例如
输入Dataframe

期望输出Dataframe

您可以看到,当一行的c1=1时,该行会将c4列的内容拆分为新行(因为长度超出了限制)。否则当c1=0时,则c4包含全部内容,无需换行。c4列可以将其拆分为多行
pyspark中的这个pyspark.sql.functions.explode(col),我需要取消编码,但是我有一个conditional is c1列(它并不简单,比如group by then collect list df.groupby().agg(f.collect\u list()),因为c1是sequence conditional)
我尝试通过这个主题使用窗口函数flow pyspark-将上一行和下一行附加到当前行。但下一步如何解决c4 col中断多行
样本代码

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()

df_in = spark_session.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
)

df_out = spark_session.createDataFrame(
    [
      ('a', 'b', 'c1-c2', 'd'),
      ('e', 'f', 'g', 'h'),
      ('1', '2', '3', '4'),
      ('x', 'y', 'z1-z2-z3-z4', 'k'), 
      ('6', '7', '81-82', '9')
    ],
    ['c2', 'c3', 'c4', 'c5']
)

df_in.show()
df_out.show()

我怎样才能解决这个问题。谢谢您
更新的输入

df_in = spark_session.createDataFrame(
    [
      ('0', 1, 'a', 'b', 'c1', 'd'),
      ('0', 0, 'a', 'b', 'c2', 'd'),
      ('0', 0, 'e', 'f', 'g', 'h'),
      ('0', 0, '1', '2', '3', '4'),
      ('0', 1, 'x', 'y', 'sele', 'k'),
      ('0', 1, 'x', 'y', 'ct ', 'k'),
      ('0', 1, 'x', 'y', 'from', 'k'),
      ('0', 0, 'x', 'y', 'a', 'k'),
      ('0', 1, '6', '7', '81', '9'),
      ('0', 0, '6', '7', '82', '9'),
    ],
    ['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)

输出

期望输出
x | y |选择-从-a | k

ee7vknir

ee7vknir1#

即使您的数据集位于多个分区中且未排序,此解决方案也可以工作。

from pyspark.sql.window import Window
from pyspark.sql import functions as F
orderByColumns = [F.col('c4'),F.col('c1').cast('int').desc()]
partitionColumns =[ F.col(column) for column in ['c2','c3','c5']]
df_in.orderBy(orderByColumns)\
     .withColumn('ranked',F.dense_rank().over(Window.partitionBy(partitionColumns).orderBy(orderByColumns)))\
     .withColumn('c4-ranked',F.concat(F.col('ranked'),F.lit('='),F.col('c4')))\
     .groupBy(partitionColumns)\
     .agg(F.collect_list('c4-ranked').alias('c4'))\
     .select(
         F.col('c2'),
         F.col('c3'),
         F.regexp_replace(F.array_join(F.col('c4'),"-"),"\d+=","").alias('c4'),
         F.col('c5')
     )\
     .show()
+---+---+-----------+---+
| c2| c3|         c4| c5|
+---+---+-----------+---+
|  1|  2|          3|  4|
|  x|  y|z1-z2-z3-z4|  k|
|  e|  f|          g|  h|
|  6|  7|      81-82|  9|
|  a|  b|      c1-c2|  d|
+---+---+-----------+---+

设置

df_in = sparkSession.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
).repartition(5) 

df_in.show()

在我跑步时提供(每次跑步可能都提供)

+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
|  1|  x|  y| z2|  k|
|  0|  x|  y| z4|  k|
|  1|  a|  b| c1|  d|
|  0|  1|  2|  3|  4|
|  0|  6|  7| 82|  9|
|  0|  a|  b| c2|  d|
|  0|  e|  f|  g|  h|
|  1|  6|  7| 81|  9|
|  1|  x|  y| z3|  k|
|  1|  x|  y| z1|  k|
+---+---+---+---+---+

相关问题