我需要构建一个方法来接收 pyspark.sql.Column
'c'并返回一个新的 pyspark.sql.Column
它包含用于根据列上的值是否为null/nan构建具有true/false的列表的信息
pyspark有列方法 c.isNotNull()
在非空值的情况下工作。它也有 pyspark.sql.functions.isnan
,它接收 pyspark.sql.Column
,可用于nans(但不适用于datetime/bool cols)
我正在尝试构建一个如下所示的函数:
from pyspark.sql import functions as F
def notnull(c):
return c.isNotNull() & ~F.isnan(c)
然后我想在我的Dataframe中的任何列类型中使用该函数来获取该列中是否有null/notnan值。但当提供的列类型为bool或datetime时,此操作失败:
import datetime
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
# Building SparkSession 'spark'
conf = (SparkConf().setAppName("example")
.setMaster("local[*]")
.set("spark.sql.execution.arrow.enabled", "true"))
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
# Data input and initialazing pd_df
data = {
'string_col': ['1', '1', '1', None],
'bool_col': [True, True, True, False],
'datetime_col': [
datetime.datetime(2018, 12, 9),
datetime.datetime(2018, 12, 9),
datetime.datetime(2018, 12, 9),
pd.NaT],
'float_col': [1.0, 1.0, 1.0, np.nan]
}
pd_df = pd.DataFrame(data)
# Creating spark_df from pd_df
spark_df = spark.createDataFrame(pd_df)
# This should return a new dataframe with the column 'notnulls' added
# Note: This works fine with 'float_col' and 'string_col' but does not
# work with 'bool_col' or 'datetime_col'
spark_df.withColumn('notnulls', notnull(spark_df['datetime_col'])).collect()
运行此代码段(使用“datetime\u col”)将引发以下异常:
pyspark.sql.utils.AnalysisException: "cannot resolve 'isnan(`datetime_col`)'
due to data type mismatch: argument 1 requires (double or float) type, however,
'`datetime_col`' is of timestamp type.;;\n'Project [category#217,
float_col#218, string_col#219, bool_col#220, CASE WHEN isnan(datetime_col#221)
THEN NOT isnan(datetime_col#221) ELSE isnotnull(datetime_col#221) END AS
datetime_col#231]\n+- LogicalRDD [category#217, float_col#218, string_col#219,
bool_col#220, datetime_col#221], false\n"
我理解这是因为isnan函数不能应用于'datetime\u col',因为它不是float/double类型。因为“c”是 pyspark.sql.Column
对象,我无法访问它的数据类型以根据列类型执行不同的操作。我想避免使用自定义项来解决这个问题,但我找不到任何不同的方法。
我正在使用以下依赖项:
numpy==1.19.1
pandas==1.0.4
pyarrow==1.0.0
pyspark==2.4.5
暂无答案!
目前还没有任何答案,快来回答吧!