我有一个Dataframe。
df = spark.createDataFrame(
[
['3', '2', '3', '30', '0040'],
['2', '5', '7', '6', '0012'],
['5', '8', '1', '73', '0062'],
['4', '2', '5', '2', '0005'],
['5', '2', '4', '12', '0002'],
['8', '3', '2', '23', '0025'],
['2', '2', '8', '23', '0004'],
['5', '5', '4', '12', '0002'],
['8', '2', '2', '23', '0042'],
['2', '2', '8', '23', '0004']
],
['col1', 'col2', 'col3', 'col4', 'col5']
)
df.show()
我想根据下面的条件和不同的值添加一个新列。
cond = F.substring(F.col('col5'), 3, 1) == '0'
df1 = df.where(cond)
d_list = df1.select('col2').rdd.map(lambda x: x[0]).distinct().collect()
df2 = df.withColumn('new_col', F.when(F.col('col2').isin(d_list), F.lit('1')).otherwise('0'))
df2.show()
结果:
+----+----+----+----+----+-------+
|col1|col2|col3|col4|col5|new_col|
+----+----+----+----+----+-------+
| 3| 2| 3| 30|0040| 1|
| 2| 5| 7| 6|0012| 1|
| 5| 8| 1| 73|0062| 0|
| 4| 2| 5| 2|0005| 1|
| 5| 2| 4| 12|0002| 1|
| 8| 3| 2| 23|0025| 0|
| 2| 2| 8| 23|0004| 1|
| 5| 5| 4| 12|0002| 1|
| 8| 2| 2| 23|0042| 1|
| 2| 2| 8| 23|0004| 1|
+----+----+----+----+----+-------+
我认为这种方法不适合大数据集。由于出现警告,正在寻找不使用“collect()”方法的改进或替代方法: use of collect() can lead to poor spark performance
3条答案
按热度按时间3zwtqj6y1#
您可以添加
d_list
列使用collect_set
,并使用array_contains
检查是否col2
在该列中:bweufnob2#
还有一种方法:
您没有说明它们可能包含多少不同的值
col2
,但如果该数字足够小,则可以使用广播连接来提高性能。uelo1irk3#
您也可以尝试将条件设置为true时设置为1,然后在col2上分区以获得max:
如果顺序很重要,请先指定id,然后再指定orderby: