如何按spark\u partition\u id分组而不进行无序排列?

ztyzrc3y  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(248)

我有30个小于等于26mb的文件。
我想读入它们并将它们聚合到array[string](collect\u list)中。然后把它们写出来,不要拖沓。
我的groupby密钥是 spark_partition_id 但groupby会导致一次无序播放,并输出大于30mb的文件。
我尝试重新分区之前,我的聚合,但它仍然输出一些文件大于30mb。我希望经营一家 collect_list 不洗牌,因为我在每个分区内聚合。
示例输入json文件2行:

{"col1": "val1", "col2": "val2"}
{"col1": "val5", "col2": "val6"}

示例输出json文件1行:

[
{"col1": "val1", "col2": "val2"},
{"col1": "val5", "col2": "val6"}
]

我之所以要这样做是因为我发布到的api只接受<30mb数组[json]文件。

val sparkConf = new SparkConf()
.setAppName(s"app_name")
.set("spark.sql.files.maxPartitionByte","26843546") //make sure each partition is <30mb

val spark = SparkSession.builder
.config(sparkConf)
.enableHiveSupport
.getOrCreate()

val df = spark.read.json(path)

val cols = df.columns

df
.select(to_json(struct(cols.head, cols.tail:_*)).alias("json"))
.groupBy(spark_partition_id())
.agg(collect_list("json").alias("json_list"))
.select(col("json_list").cast("string"))
.write
.mode("overwrite")
.text(newPath)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题