pyspark 在Azure Blob存储中重命名Spark输出CSV

vkc1a9a2  于 5个月前  发布在  Spark
关注(0)|答案(3)|浏览(60)

我有一个Databricks笔记本设置,其工作原理如下:

  • 到Blob存储帐户的pyspark连接详细信息
  • 通过spark框架读取文件
  • 转换为pandas Df
  • pandas Df数据建模
  • 转换为SparkDf
  • 在单个文件中写入blob存储

我的问题是,你不能命名的文件输出文件,我需要一个静态的CSV文件名。
有办法在pyspark中重命名吗?

## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""

## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"

## Connection string to connect to blob storage
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

字符串
数据转换后输出文件

dfspark.coalesce(1).write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save(file_location_new)


然后将文件写入**“part-00000-tid-336943946930983.....csv”**
这里的目标是让**“Output.csv”**
我研究的另一种方法是在python中重新创建它,但在文档中还没有遇到如何将文件输出回blob存储。
我知道从Blob存储中检索的方法是 .get_blob_to_path 通过microsoft.docs
这里的任何帮助都非常感谢。

pw136qt2

pw136qt21#

Hadoop/Spark会将每个分区的计算结果并行输出到一个文件中,所以你会在一个HDFS输出路径中看到很多part-<number>-....文件,比如你命名的Output/
如果你想将所有计算结果输出到一个文件中,你可以通过hadoop fs -getmerge /output1/part* /output2/Output.csv命令将它们合并,或者像使用coalesce(1)函数一样使用1设置reduce进程的数量。
因此,在您的场景中,您只需要调整调用这些函数的顺序,使coalease函数在save函数的前面被调用,如下所示。

dfspark.write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").coalesce(1).save(file_location_new)

字符串

kx5bkwkv

kx5bkwkv2#

coalescerepartition不能帮助将加密框保存到一个正常命名的文件中。
我最终只是重命名了1 csv文件并删除了log文件夹:

def save_csv(df, location, filename):
  outputPath = os.path.join(location, filename + '_temp.csv')

  df.repartition(1).write.format("com.databricks.spark.csv").mode("overwrite").options(header="true", inferSchema="true").option("delimiter", "\t").save(outputPath)

  csv_files = os.listdir(os.path.join('/dbfs', outputPath))

  # moving the parquet-like temp csv file into normally named one
  for file in csv_files:
    if file[-4:] == '.csv':
      dbutils.fs.mv(os.path.join(outputPath,file) , os.path.join(location, filename))
      dbutils.fs.rm(outputPath, True)

# using save_csv
save_csv_location = 'mnt/.....'
save_csv(df, save_csv_location, 'name.csv')

字符串

nvbavucw

nvbavucw3#

可以使用mssparkutils重命名文件

old_name = f"path/.blob.core.windows.net/Databricks_Files/out"
new_name = f"path/.blob.core.windows.net/Databricks_Files/out/output.csv"
mssparkutils.fs.mv(old_name,new_name)

字符串
以防万一:如果你的文件是自动生成的,并且你得到的文件名是part-0000*文件,那么使用python单独获取部分文件名,然后使用上面的代码重命名它。

相关问题