python3.x—将由json structtypes组成的arraytype转换为dataframe列

m0rkklqb  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(232)

我有一个Dataframe,它的列是一个名为 segmentationFieldValues . 此arraytype由30个不同的json StructType组成,每个StructType有三个字段:

|-- emailAddress: string (nullable = true)
 |-- subscriptionState: string (nullable = true)
 |-- subscribeDate: timestamp (nullable = true)
 |-- resubscribeDate: timestamp (nullable = true)
 |-- subscribeMethod: string (nullable = true)
 |-- unsubscribeDate: timestamp (nullable = true)
 |-- unsubscribeMethod: string (nullable = true)
 |-- segmentationFieldValues: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- segmentationFieldGroupId: string (nullable = true)
 |    |    |-- segmentationFieldId: string (nullable = true)
 |    |    |-- value: string (nullable = true)

因此,所有行都将有一个dictionary对象,该对象的值相同 segmentationFieldGroupID 以及 segmentationFieldID 不同的是存储在 value 与这些关联的键。我试图通过将这些json对象的值绑定到 segmentationFieldGroupID 以及 segmentationFieldID 并使其成为列的名称,列的值存储在 value 现场。
我尝试创建一个udf函数,根据 segmentationFieldGroupID 以及 segmentationFieldID 并归还 value :

def filterArray(segGroup, segField, array):
  val = list(filter(lambda x: (x['segmentationFieldGroupId'] == segGroup) & (x['segmentationFieldId'] == segField), array))
  return val[0]['value']

filterArrayPython = udf(filterArray, StringType())

df.select('segmentationFieldValues', filterArrayPython('264175', '2433004', 'segmentationFieldValues').alias('264175_2433004')).show(5)

但到目前为止我得到了一个错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`264175`' given input columns: [segmentationFieldValues, unsubscribeMethod, subscribeMethod, subscriptionState, resubscribeDate, subscribeDate, emailAddress, unsubscribeDate];;

不知道为什么,函数可以工作,但在pyspark上下文中不行。如何修复此方法或以不同的方式进行?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题