我需要在dataframe pyspark中用条件序号对一列进行反编码。例如
输入Dataframe
期望输出Dataframe
您可以看到,当一行的c1=1时,该行会将c4列的内容拆分为新行(因为长度超出了限制)。否则当c1=0时,则c4包含全部内容,无需换行。c4列可以将其拆分为多行
pyspark中的这个pyspark.sql.functions.explode(col),我需要取消编码,但是我有一个conditional is c1列(它并不简单,比如group by then collect list df.groupby().agg(f.collect\u list()),因为c1是sequence conditional)
我尝试通过这个主题使用窗口函数flow pyspark-将上一行和下一行附加到当前行。但下一步如何解决c4 col中断多行
样本代码
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
df_in = spark_session.createDataFrame(
[
(1, 'a', 'b', 'c1', 'd'),
(0, 'a', 'b', 'c2', 'd'),
(0, 'e', 'f', 'g', 'h'),
(0, '1', '2', '3', '4'),
(1, 'x', 'y', 'z1', 'k'),
(1, 'x', 'y', 'z2', 'k'),
(1, 'x', 'y', 'z3', 'k'),
(0, 'x', 'y', 'z4', 'k'),
(1, '6', '7', '81', '9'),
(0, '6', '7', '82', '9'),
],
['c1', 'c2', 'c3', 'c4', 'c5']
)
df_out = spark_session.createDataFrame(
[
('a', 'b', 'c1-c2', 'd'),
('e', 'f', 'g', 'h'),
('1', '2', '3', '4'),
('x', 'y', 'z1-z2-z3-z4', 'k'),
('6', '7', '81-82', '9')
],
['c2', 'c3', 'c4', 'c5']
)
df_in.show()
df_out.show()
我怎样才能解决这个问题。谢谢您
更新的输入
df_in = spark_session.createDataFrame(
[
('0', 1, 'a', 'b', 'c1', 'd'),
('0', 0, 'a', 'b', 'c2', 'd'),
('0', 0, 'e', 'f', 'g', 'h'),
('0', 0, '1', '2', '3', '4'),
('0', 1, 'x', 'y', 'sele', 'k'),
('0', 1, 'x', 'y', 'ct ', 'k'),
('0', 1, 'x', 'y', 'from', 'k'),
('0', 0, 'x', 'y', 'a', 'k'),
('0', 1, '6', '7', '81', '9'),
('0', 0, '6', '7', '82', '9'),
],
['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)
输出
期望输出
x | y |选择-从-a | k
1条答案
按热度按时间ee7vknir1#
即使您的数据集位于多个分区中且未排序,此解决方案也可以工作。
设置
在我跑步时提供(每次跑步可能都提供)