pyspark:从嵌套字典创建sparkDataframe

r6vfmomb  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(335)

如何从嵌套字典创建sparkDataframe?我是新来的。我不想使用PandasDataframe。
我的字典看like:-

{'prathameshsalap@gmail.com': {'Date': datetime.date(2019, 10, 21),'idle_time': datetime.datetime(2019, 10, 21, 1, 50)},
 'vaishusawant143@gmail.com': {'Date': datetime.date(2019, 10, 21),'idle_time': datetime.datetime(2019, 10, 21, 1, 35)},
 'you@example.com': {'Date': datetime.date(2019, 10, 21),'idle_time': datetime.datetime(2019, 10, 21, 1, 55)}
}

我想用pysparkDataframe把这个dict转换成sparkDataframe。
我的期望output:-

Date    idle_time
    user_name       
prathameshsalap@gmail.com   2019-10-21  2019-10-21 01:50:00
vaishusawant143@gmail.com   2019-10-21  2019-10-21 01:35:00
you@example.com             2019-10-21  2019-10-21 01:55:00
2eafrhcq

2eafrhcq1#

您需要重做字典并构建行以正确推断模式。

import datetime
from pyspark.sql import Row

data_dict = {
    'prathameshsalap@gmail.com': {
        'Date': datetime.date(2019, 10, 21),
        'idle_time': datetime.datetime(2019, 10, 21, 1, 50)
    },
    'vaishusawant143@gmail.com': {
        'Date': datetime.date(2019, 10, 21),
        'idle_time': datetime.datetime(2019, 10, 21, 1, 35)
    },
    'you@example.com': {
        'Date': datetime.date(2019, 10, 21),
        'idle_time': datetime.datetime(2019, 10, 21, 1, 55)
    }
}

data_as_rows = [Row(**{'user_name': k,**v}) for k,v in data_dict.items()]
data_df = spark.createDataFrame(data_as_rows).select('user_name', 'Date', 'idle_time')

data_df.show(truncate=False)

>>>
+-------------------------+----------+-------------------+
|user_name                |Date      |idle_time          |
+-------------------------+----------+-------------------+
|prathameshsalap@gmail.com|2019-10-21|2019-10-21 01:50:00|
|vaishusawant143@gmail.com|2019-10-21|2019-10-21 01:35:00|
|you@example.com          |2019-10-21|2019-10-21 01:55:00|
+-------------------------+----------+-------------------+

注意:如果您已经准备好了模式,并且不需要推断,那么只需将模式提供给createdataframe函数:

import pyspark.sql.types as T

schema = T.StructType([
    T.StructField('user_name', T.StringType(), False),
    T.StructField('Date', T.DateType(), False),
    T.StructField('idle_time', T.TimestampType(), False)
])
data_as_tuples = [(k, v['Date'], v['idle_time']) for k,v in data_dict.items()]

data_df = spark.createDataFrame(data_as_tuples, schema=schema)

data_df.show(truncate=False)

>>>
+-------------------------+----------+-------------------+
|user_name                |Date      |idle_time          |
+-------------------------+----------+-------------------+
|prathameshsalap@gmail.com|2019-10-21|2019-10-21 01:50:00|
|vaishusawant143@gmail.com|2019-10-21|2019-10-21 01:35:00|
|you@example.com          |2019-10-21|2019-10-21 01:55:00|
+-------------------------+----------+-------------------+
vd8tlhqk

vd8tlhqk2#

将字典转换为元组列表,然后每个元组将成为spark dataframe中的一行:

rows = []
for key, value in data.items():
    row = (key,value['Date'], value['idle_time'])
    rows.append(row)

为数据定义架构:

from pyspark.sql.types import *

sch = StructType([
    StructField('user_name', StringType()),
    StructField('date', DateType()),
    StructField('idle_time', TimestampType())
])

创建sparkDataframe:

df = spark.createDataFrame(rows, sch)

df.show()
+--------------------+----------+-------------------+
|           user_name|      date|          idle_time|
+--------------------+----------+-------------------+
|prathameshsalap@g...|2019-10-21|2019-10-21 01:50:00|
|vaishusawant143@g...|2019-10-21|2019-10-21 01:35:00|
|     you@example.com|2019-10-21|2019-10-21 01:55:00|
+--------------------+----------+-------------------+

相关问题