writedynamic在apachebeam的emr/spark上没有完成对s3的写入,遗漏了temp文件

njthzxwz  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(262)

我有一个有边界的pcollection,并希望使用动态文件命名方案将输出持久化到s3 bucket。不幸的是,在emr(尝试过emr-6.2.0和emr-5.30.1)和spark runner上运行时,在管道中的所有步骤完成并且集群终止之后,s3上的输出在主输出目录中只包含一些预期的内容,尽管大部分内容放在.temp beam文件夹中,并且从未移动到主输出目录。其中大部分=90%的预期行未保留在正确命名的文件中,抽查表明预期行位于.temp beam文件夹中的文件中。以下是相关的管道声明部分:

PCollection<SomeObject> input; // is a bounded PCollection   

FileIO.Write<String, SomeObject> write = FileIO.<String, SomeObject>writeDynamic()
                    .by(SomeObject::key)
                    .withDestinationCoder(StringUtf8Coder.of())
                    .withCompression(Compression.GZIP)
                    .withNaming((SerializableFunction<String, FileIO.Write.FileNaming>) key
                            -> (FileIO.Write.FileNaming) (window, pane, numShards, shardIndex, compression)
                            -> String.format("some_object_%s_%d.csv.gz", key, shardIndex))
                    .via(Contextful.fn(SomeObject::toCsvLine), Contextful.fn(x -> TextIO.sink().withHeader(SomeObject.HEADER)))
                    .to("s3://some-bucket/some-output-path");

input.apply("write-a-pcollection", write);

通过这个代码,我得到了一个s3 bucket,它看起来像:
一些对象\u key1 \u 0.csv.gz
一些对象\u key1 \u 1.csv.gz
一些对象\u key2 \u 0.csv.gz
一些对象\u key3 \u 0.csv.gz
.temp beam-,其中90%的预期内容保留在使用随机uuid命名的对象中
但是,当我加上 .withIgnoreWindowing() 对于writedynamic配置,输出似乎是完全正确的,并且没有留下.temp beam目录。但是这个方法已经被弃用了,javadocs中也没有提供任何替代方法(至少我找不到任何替代方法)。
为什么本文作者需要忽略窗口才能在这种情况下正常工作,以及管道应该是什么样子才能不使用它 .withIgnoreWindowing() ?
更新:
尝试在应用writedynamic之前添加全局窗口步骤(如注解中所示),但没有成功:

input.apply("wnd", Window.<SomeObject>into(new GlobalWindows())
     .triggering(DefaultTrigger.of()))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题