我有30个小于等于26mb的文件。
我想读入它们并将它们聚合到array[string](collect\u list)中。然后把它们写出来,不要拖沓。
我的groupby密钥是 spark_partition_id
但groupby会导致一次无序播放,并输出大于30mb的文件。
我尝试重新分区之前,我的聚合,但它仍然输出一些文件大于30mb。我希望经营一家 collect_list
不洗牌,因为我在每个分区内聚合。
示例输入json文件2行:
{"col1": "val1", "col2": "val2"}
{"col1": "val5", "col2": "val6"}
示例输出json文件1行:
[
{"col1": "val1", "col2": "val2"},
{"col1": "val5", "col2": "val6"}
]
我之所以要这样做是因为我发布到的api只接受<30mb数组[json]文件。
val sparkConf = new SparkConf()
.setAppName(s"app_name")
.set("spark.sql.files.maxPartitionByte","26843546") //make sure each partition is <30mb
val spark = SparkSession.builder
.config(sparkConf)
.enableHiveSupport
.getOrCreate()
val df = spark.read.json(path)
val cols = df.columns
df
.select(to_json(struct(cols.head, cols.tail:_*)).alias("json"))
.groupBy(spark_partition_id())
.agg(collect_list("json").alias("json_list"))
.select(col("json_list").cast("string"))
.write
.mode("overwrite")
.text(newPath)
暂无答案!
目前还没有任何答案,快来回答吧!