filter对pysparkDataframe模式进行过滤,以获取具有特定类型列的新Dataframe

p5cysglq  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(345)

我想在pyspark中创建一个泛型函数,将dataframe和datatype作为参数,并过滤不满足条件的列。我对python不是很在行,我被困在一个点上,从那里我无法找到我可以如何做到这一点。我有一个代码的scala表示,它做同样的事情。

//sample data
val df = Seq(("587","mumbai",Some(5000),5.05),("786","chennai",Some(40000),7.055),("432","Gujarat",Some(20000),6.75),("2","Delhi",None,10.0)).toDF("Id","City","Salary","Increase").withColumn("RefID",$"Id")

import org.apache.spark.sql.functions.col
def selectByType(colType: DataType, df: DataFrame) = {
  val cols = df.schema.toList
    .filter(x => x.dataType == colType)
    .map(c => col(c.name))
  df.select(cols:_*)

}
val res = selectByType(IntegerType, df)

res是只包含整数列的Dataframe,在本例中是salary列,我们动态删除了所有其他具有不同类型的列。
我不想在pyspark有同样的行为,但我不能做到这一点。
这就是我尝试过的

//sample data
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True), \
                     StructField("raise",DoubleType(),True) \
  ])
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000,2.5),
    ("Michael","Rose","","40288","M",4000,4.7),
    ("Robert","","Williams","42114","M",4000,8.9),
    ("Maria","Anne","Jones","39192","F",4000,0.0),
    ("Jen","Mary","Brown","","F",-1,-1.2)
  ]
df = spark.createDataFrame(data=data2,schema=schema)
//getting the column list from schema of the dataframe
pschema = df.schema.fields
datatypes = [IntegerType,DoubleType]  //column datatype that I want.
out = filter(lambda x: x.dataType.isin(datatypes), pschema)  //gives invalid syntax error.

有人能帮我解决我做错了什么事吗。scala代码只传递一个数据类型,但根据我的用例,我想处理这样一个场景:我们可以传递多个数据类型,然后我们得到具有指定数据类型的所需列的Dataframe。
一开始,如果有人能告诉我如何使它在单个数据类型上工作,那么我可以尝试一下,看看我是否能在多个数据类型上工作。
注意:scala和pyspark的样本数据是不同的,因为我从某处复制pyspark样本数据只是为了加快操作,因为我只关心最终的输出需求。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题