我有10万的 tar.gz
包含json文件和csv文件的文件。我只想使用pyspark将csv文件加载到Dataframe中。出于明显的性能原因,它需要以分布式方式完成。我还希望避免在任何中间步骤中写入磁盘。
我的方法是将gzip文件作为二进制文件加载到Dataframe中;然后使用自定义项将csv文件提取到一个新的dataframe列中,如下面的代码所示(在spark3.0上运行)。
是否可以将csv文件列加载到一个新的Dataframe中,就像一个应用了模式的csv文件读取一样?从文件上我看不出这是怎么可能的。
import tarfile
from io import BytesIO
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import types as t
spark = SparkSession \
.builder \
.appName("load gzipped csv files") \
.getOrCreate()
# UDF to extract CSV files from tar.gz files
def extract_files(file_bytes):
output = None
tar_file = tarfile.open(fileobj=BytesIO(file_bytes), mode="r:gz")
for file in tar_file:
if file.name.lower() == "data.csv":
file_stream = tar_file.extractfile(file)
file_stream.flush()
file_stream.seek(0)
output = io.TextIOWrapper(file_stream, encoding='utf-8').read()
return output
extract_files_udf = f.udf(lambda file_bytes: extract_files(file_bytes), t.StringType())
df = spark.read.format("binaryFile").load("/path/to/file.tar.gz")
df2 = df.withColumn("csv_file_contents", extract_files_udf(df.content)) \
.drop("content")
# extract the contents of the csv_file_contents column in df2 and create a new Dataframe with it
# something like...
df3 = spark.read \
.schema(my_schema) \
.option("header", True) \
.csv(df2.csv_file_contents)
暂无答案!
目前还没有任何答案,快来回答吧!