pyspark:将tar.gz文件中的特定文件添加到dataframe

3lxsmp7m  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(460)

我有10万的 tar.gz 包含json文件和csv文件的文件。我只想使用pyspark将csv文件加载到Dataframe中。出于明显的性能原因,它需要以分布式方式完成。我还希望避免在任何中间步骤中写入磁盘。
我的方法是将gzip文件作为二进制文件加载到Dataframe中;然后使用自定义项将csv文件提取到一个新的dataframe列中,如下面的代码所示(在spark3.0上运行)。
是否可以将csv文件列加载到一个新的Dataframe中,就像一个应用了模式的csv文件读取一样?从文件上我看不出这是怎么可能的。

import tarfile

from io import BytesIO
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import types as t

spark = SparkSession \
    .builder \
    .appName("load gzipped csv files") \
    .getOrCreate()

# UDF to extract CSV files from tar.gz files

def extract_files(file_bytes):
    output = None
    tar_file = tarfile.open(fileobj=BytesIO(file_bytes), mode="r:gz")

    for file in tar_file:
        if file.name.lower() == "data.csv":
            file_stream = tar_file.extractfile(file)
            file_stream.flush()
            file_stream.seek(0)
            output = io.TextIOWrapper(file_stream, encoding='utf-8').read()

    return output

extract_files_udf = f.udf(lambda file_bytes: extract_files(file_bytes), t.StringType())

df = spark.read.format("binaryFile").load("/path/to/file.tar.gz")

df2 = df.withColumn("csv_file_contents", extract_files_udf(df.content)) \
    .drop("content")

# extract the contents of the csv_file_contents column in df2 and create a new Dataframe with it

# something like...

df3 = spark.read \
    .schema(my_schema) \
    .option("header", True) \
    .csv(df2.csv_file_contents)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题