apache beam+apache flink+aws s3

zy1mlcev  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(328)

我试图在apachebeam中编写一个简单的单词计数示例,并使用direct runner、flink runner和spark runner运行它。我的输入和输出在awss3上,我的flink/spark集群使用docker在本地机器上运行。
对于direct runner和spark runner,代码运行良好,但我得到:
org.apache.beam.sdk.io.aws.s3.s3resourceid不能转换为org.apache.beam.sdk.io.localresourceid
运行flink runner时出错。
我绝对不需要对spark进行任何配置更改,只需在beam代码中配置awsoptions就可以顺利地工作。但是相同的代码给出了flink的上述错误。因此,我尝试使用来自的指令来配置flink从s3读/写https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended,但没用。
查看错误消息很明显,flink没有为s3文件系统正确配置,但是我还没有找到任何方法来解决这个问题。
有什么提示吗???以下是我的主要方法:

public static void main( String[] args ) throws IOException
{
    S3WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(S3WordCountOptions.class);
    options.as(AwsOptions.class).setAwsCredentialsProvider(getAWSCredentialProvider(options.getAwsKey(), options.getAwsSecret()));
    options.as(AwsOptions.class).setAwsRegion(options.getAwsRegion());
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    .apply(FlatMapElements
            .into(TypeDescriptors.strings())
            .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
    .apply(Filter.by((String word) -> !word.isEmpty()))
    .apply(Count.perElement())
    .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

    p.run().waitUntilFinish();
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题