组合并计算spark dataframe中的json列

gwo2fgha  于 2021-07-24  发布在  Java
关注(0)|答案(2)|浏览(242)

我想在spark dataframe和hive表中聚合一个列值(json)。
例如

year,   month,    val (json)
 2010    01        [{"a_id":"caes"},{"a_id":"rgvtsa"},{"a_id":"btbsdv"}]
 2010    01        [{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"ohcwa"}]
 2008    10        [{"a_id":"rfve"},{"a_id":"yjndf"},{"a_id":"onbds"}]
 2008    10        [{"a_id":"fvds"},{"a_id":"yjndf"},{"a_id":"yesva"}]

我需要:

year,   month,    val (json),                                                          num (int)
 2010    01        [{"a_id":"caes"},{"a_id":"rgvtsa"},{"a_id":"btbsdv},{"a_id":"uktf"}, {"a_id":"ohcwa"}]     5

 2008    10        [{"a_id":"rfve"},{"a_id":"yjndf"},{"a_id":"onbds"},{"a_id":"yesva"}]      4

我需要删除重复项,并在其中找到json字符串的大小(num of“a\u id”)。
数据被保存为一个配置单元表,这样用pysparksql处理它会更好?
我也想知道如何工作,如果它被保存为一个SparkDataframe。
我试过:

from pyspark.sql.functions import from_json, col
 from pyspark.sql.types import StructType, StructField, StringType

 schema = StructType(
  [
    StructField('a_id', StringType(), True)
  ]
 )

 df.withColumn("val", from_json("val", schema))\
.select(col('year'), col('month'), col('val.*'))\
.show()

但是,“val1”中的所有值都为空。
谢谢
升级我的配置单元版本:

%sh
 ls /databricks/hive | grep "hive"
 spark--maven-trees--spark_1.4_hive_0.13

我的ddl:

import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.sql.types import *

def concate_elements(val):
   return reduce (lambda x, y:x+y, val)

flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), 
ArrayType(StringType()))

# final results

df.select("year","month", flatten_array("val").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",F.size("uniquevalues")).show()
erhoui1w

erhoui1w1#

考虑输入数据输入json文件 json-input.json ```
{"year":"2010","month":"01","value":[{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"ohcwa"}]}
{"year":"2011","month":"01","value":[{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"uktf"},{"a_id":"sathya"}]}

方法1。从配置单元读取数据
1.将数据插入配置单元

ADD JAR /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar
CREATE EXTERNAL TABLE json_table (
year string,
month string,
value array<struct<a_id:string>>)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe';
load data local inpath '/home/sathya/json-input.json' into table json_table;

select * from json_table;
OK
2010 01 [{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"ohcwa"}]
2011 01 [{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"uktf"},{"a_id":"sathya"}]

2.从spark读取数据:

pyspark --jars /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar --driver-class-path /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar

df=spark.sql("select * from default.json_table")
df.show(truncate=False)
'''
+----+-----+----------------------------------+
|year|month|value |
+----+-----+----------------------------------+
|2010|01 |[[caes], [uktf], [ohcwa]] |
|2011|01 |[[caes], [uktf], [uktf], [sathya]]|
+----+-----+----------------------------------+
'''

UDFs for concatenating the array elements & removing duplicates in an array

def concate_elements(val):
return reduce (lambda x, y:x+y, val)

flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), ArrayType(StringType()))

final results

df.select("year","month",flattenUdf("value").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",size("uniquevalues")).show()

'''
+----+-----+--------------------------+--------------------+----+
|year|month|flattenvalues |uniquevalues |size|
+----+-----+--------------------------+--------------------+----+
|2010|01 |[caes, uktf, ohcwa] |[caes, uktf, ohcwa] |3 |
|2011|01 |[caes, uktf, uktf, sathya]|[caes, sathya, uktf]|3 |
+----+-----+--------------------------+--------------------+----+

'''

方法2-直接读取输入json文件 `json-input.json` ```
{"year":"2010","month":"01","value":[{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"ohcwa"}]}
{"year":"2011","month":"01","value":[{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"uktf"},{"a_id":"sathya"}]}

场景的代码是:

import os
import logging 
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
import pyspark.sql.types as T

df=spark.read.json("file:///home/sathya/json-input.json")

df.show(truncate=False)
'''
+-----+----------------------------------+----+
|month|value                             |year|
+-----+----------------------------------+----+
|01   |[[caes], [uktf], [ohcwa]]         |2010|
|01   |[[caes], [uktf], [uktf], [sathya]]|2011|
+-----+----------------------------------+----+
'''

# UDFs for concatenating the array elements & removing duplicates in an array

def concate_elements(val):
    return reduce (lambda x, y:x+y, val)

flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), ArrayType(StringType()))

# final results

df.select("year","month",flattenUdf("value").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",size("uniquevalues")).show()

'''
+----+-----+--------------------------+--------------------+----+
|year|month|flattenvalues             |uniquevalues        |size|
+----+-----+--------------------------+--------------------+----+
|2010|01   |[caes, uktf, ohcwa]       |[caes, uktf, ohcwa] |3   |
|2011|01   |[caes, uktf, uktf, sathya]|[caes, sathya, uktf]|3   |
+----+-----+--------------------------+--------------------+----+

'''
zqdjd7g9

zqdjd7g92#

下面是一个适用于databricks的解决方案:


# Import libraries

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define schema

schema1=StructType([
  StructField('year',IntegerType(),True),
  StructField('month',StringType(),True),
  StructField('val',ArrayType(StructType([
    StructField('a_id',StringType(),True)
  ])))
])

# Test data

rowsArr=[
  [2010,'01',[{"a_id":"caes"},{"a_id":"rgvtsa"},{"a_id":"btbsdv"}]],
  [2010,'01',[{"a_id":"caes"},{"a_id":"uktf"},{"a_id":"ohcwa"}]],
  [2008,'10',[{"a_id":"rfve"},{"a_id":"yjndf"},{"a_id":"onbds"}]],
  [2008,'10',[{"a_id":"fvds"},{"a_id":"yjndf"},{"a_id":"yesva"}]]
]

# Create dataframe

df1=(spark
     .createDataFrame(rowsArr,schema=schema1)
    )

# Create database

spark.sql('CREATE DATABASE IF NOT EXISTS testdb')

# Dump it into hive table

(df1
 .write
 .mode('overwrite')
 .options(schema=schema1)
 .saveAsTable('testdb.testtable')
)

# read from hive table

df_ht=(spark
       .sql('select * from testdb.testtable')
      )

# Perform transformation

df2=(df_ht
     .groupBy('year','month')
     .agg(array_distinct(flatten(collect_list('val'))).alias('val'))
     .withColumn('num',size('val'))
    )

输入测向:

输出测向:

相关问题