从emr覆盖到s3 bucket需要很长时间

ncecgwcz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(349)

我使用emr(5.30)和spark以及hadoop作为所选的应用程序。条件如下-
历史数据位于s3位置(700 gb)
每日增量数据(500 mb)
我需要根据一些条件创建一个合并框架
将结果写回s3并终止集群
我在做什么-
使用lambda提供的1 m5.8xlagle(主)和5 m5.8xlagle(核心)以及所有配置来旋转集群
配置-29个执行器示例,5个执行器内核,18 gb内存/执行器。3 gb开销内存,默认并行度290
在pyspark脚本中:
在一些操作之后复制历史数据,比如转换数据类型,然后复制到临时位置(s3)-覆盖
向临时位置(相同的临时位置)添加增量数据-追加
执行所有必需的转换
将结果放回另一个s3位置-覆盖(相同的bucket但不同的文件夹),第二天相同的循环将继续
观察:
最后的覆盖过程需要一个多小时,但第一个过程只需要14-15分钟
我尝试过重新划分和合并,没有任何改善
当尝试将s3n.multipart.uploads.enabled设置为false时,会出现启用它的错误
根据亚马逊文档https://docs.aws.amazon.com/amazons3/latest/dev/qfacts.html ,一个列表中只有1000个零件的限制。因此,每个文件的平均大小是600-700MB,因为总输出大小大约是700GB,所以每当我试图重写no时,通过重新分区,它实际上是不可能发生的
如果我在重写时在代码中使用partionby逻辑,那就没有意义了,因为这需要更多的时间(超过2小时)。第二天我还要改写结果。
另一个观察结果是,重写时,它将首先删除文件夹的内容,然后删除文件夹并重新创建,以放置emr的结果集。在这种情况下,整个过程都失败了。
我的问题是:
为什么在代码相同的情况下,两个覆盖过程所用的时间间隔不同 df.write.mode('overwrite').parquet(target_location) 如何提高上次覆盖的性能

2izufjch

2izufjch1#

iiuc,在第一个场景中,您正在处理700 gb数据,在第二个场景中,您正在500 mb数据和700 gb数据之间执行一些连接,以启用upserts并将其保存回s3。
如果是这种情况,那么问题不在于write语句,而是在第二种情况下执行的转换,因为必须将增量数据与完整数据连接起来,以标识插入和更新,然后相应地修改结果。
如果这种情况持续下去,您的数据将不断增长,处理数据的时间和内存将不断耗尽。
如果您要在s3上构建一个增量数据湖,您应该考虑使用hudi,它将加快您的工作,并正确地利用您的集群来处理上游数据。
下面是实现hudi的链接。
https://github.com/apache/hudi

相关问题