如何在pyspark中检测空列

gdx19jrr  于 4个月前  发布在  Spark
关注(0)|答案(4)|浏览(55)

我有一个用一些空值定义的框架。一些列是完全空值。

>> df.show()
+---+---+---+----+
|  A|  B|  C|   D|
+---+---+---+----+
|1.0|4.0|7.0|null|
|2.0|5.0|7.0|null|
|3.0|6.0|5.0|null|
+---+---+---+----+

字符串
在我的例子中,我想返回一个由空值填充的列名列表。我的想法是检测常量列(因为整个列包含相同的空值)。
我是这么做的

nullCoulumns = [c for c, const in df.select([(min(c) == max(c)).alias(c) for c in df.columns]).first().asDict().items() if const]


但这并不把空列当作常量,它只对值起作用。那么我应该怎么做呢?

li9yvcax

li9yvcax1#

将条件扩展到

from pyspark.sql.functions import min, max

((min(c).isNull() & max(c).isNull()) | (min(c) == max(c))).alias(c)

字符串
或者使用eqNullSafe(PySpark 2.3):

(min(c).eqNullSafe(max(c))).alias(c)

nsc4cvqm

nsc4cvqm2#

一种方法是隐式地执行:选择每一列,计算其NULL值,然后将其与总数或行数进行比较。对于您的数据,这将是:

spark.version
# u'2.2.0'

from pyspark.sql.functions import col

nullColumns = []
numRows = df.count()
for k in df.columns:
  nullRows = df.where(col(k).isNull()).count()
  if nullRows ==  numRows: # i.e. if ALL values are NULL
    nullColumns.append(k)

nullColumns
# ['D']

字符串
但是有一个更简单的方法:事实证明,函数countDistinct,当应用于所有NULL值的列时,返回零(0):

from pyspark.sql.functions import countDistinct

df.agg(countDistinct(df.D).alias('distinct')).collect()
# [Row(distinct=0)]


所以for循环现在可以是:

nullColumns = []
for k in df.columns:
  if df.agg(countDistinct(df[k])).collect()[0][0] == 0:
    nullColumns.append(k)

nullColumns
# ['D']

更新(注解后):在第二个解决方案中似乎可以避免collect;由于df.agg返回的是一个只有一行的字符串,因此将collect替换为take(1)将安全地完成这项工作:

nullColumns = []
for k in df.columns:
  if df.agg(countDistinct(df[k])).take(1)[0][0] == 0:
    nullColumns.append(k)

nullColumns
# ['D']

bwntbbo3

bwntbbo33#

为了保证列是 all null,必须满足两个属性:
(1)最小值等于最大值
(2)min * 或 * max为null
或等效地
(1)最小值和最大值都等于None
注意,如果不满足性质(2),则列值为[null, 1, null, 1]的情况将被错误地报告,因为最小值和最大值将为1

import pyspark.sql.functions as F

def get_null_column_names(df):
    column_names = []

    for col_name in df.columns:

        min_ = df.select(F.min(col_name)).first()[0]
        max_ = df.select(F.max(col_name)).first()[0]

        if min_ is None and max_ is None:
            column_names.append(col_name)

    return column_names

字符串
这里有一个实践中的例子:

>>> rows = [(None, 18, None, None),
            (1, None, None, None),
            (1, 9, 4.0, None),
            (None, 0, 0., None)]

>>> schema = "a: int, b: int, c: float, d:int"

>>> df = spark.createDataFrame(data=rows, schema=schema)

>>> df.show()

+----+----+----+----+
|   a|   b|   c|   d|
+----+----+----+----+
|null|  18|null|null|
|   1|null|null|null|
|   1|   9| 4.0|null|
|null|   0| 0.0|null|
+----+----+----+----+

>>> get_null_column_names(df)
['d']

qlzsbp2j

qlzsbp2j4#

时间有效:Spark> 3.1如果你的Not列数比No列数多

通常,检查整个col是否为非列是一个耗时的操作,但是通常调用isNotNull()比调用Notarrow列的isNull()要快。如果您希望有更多的Notarrow列,那么下面的limit()技巧可以加快很多。

# col_a is a Null col
st = time.time()
print("Result:", sdf.where(sdf.col_a.isNotNull()).limit(1).count())
print("Time Taken:", time.time() - st)
# Output
# Result: 0
# Time Taken: 215.386

# col_b is a Not Null col
st = time.time()
print("Result:", sdf.where(sdf.col_b.isNotNull()).limit(1).count())
print("Time Taken:", time.time() - st)
# Output
# Result: 1
# Time Taken: 7.857

字符串
结果为0意味着它是一个null col,否则为1。现在你可以在spark框架中循环所有列。

相关问题