如何将rdd保存到单个Parquet文件?

cetgtptt  于 2021-06-02  发布在  Hadoop
关注(0)|答案(4)|浏览(486)

我使用pyspark2.0和hadoop2.7.2。这是我的密码:

def func(df):
    new_df = pd.DataFrame(df['id'])
    new_df['num'] = new_df['num'] * 12
    return new_df

set = sqlContext.read.parquet("data_set.parquet")
columns = set.columns
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                                                   columns=columns)))

现在,我需要将map\u res rdd保存为parquet文件new.parquet。有没有办法在保存之前不创建一个大的Dataframe?或者是否有可能单独保存rdd的每个分区,然后合并所有保存的文件?
p、 我想在不创建Dataframe的情况下进行管理,因为它确实很大。

xmjla07d

xmjla07d1#

我建议:

dataframes = []

# creating index

map_res = map_res.zipWithIndex()

# setting index as key

map_res = map_res.map(lambda x: (x[1],x[0]))

# creating one spark df per element

for i in range(0, map_res.count()):
    partial_dataframe_pd  = map_res.lookup(i)
    partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd)
    dataframes.append(partial_dataframe)

# concatination

result_df = dataframes.pop()
for df in dataframes:
    result_df.union(df)   

# saving

result_df.write.parquet("...")

如果您有少量的分区(2-100个),那么它应该工作得相当快。

vmpqdwk3

vmpqdwk32#

您可以使用:

set.coalesce(1).write.parquet("myFile.parquet")
wgxvkvu9

wgxvkvu93#

只有两种方法可以做到这一点:
一是使用 "coalesce(1)" 这将确保所有数据都保存到一个文件中,而不是使用多个文件(默认分区数为200) dataframe.write.save("/this/is/path") .
另一个选项是将输出写入配置单元表,然后使用 hive -e "select * from table" > data.tsv 将用制表符分隔。

4szc88ey

4szc88ey4#

要以parquet格式保存文件,您需要将rdd转换为dataframe,因为parquet文件总是需要一个模式进行处理。

相关问题