如何使用java在ApacheBeam中按key进行reduce,并得到这个输出key=value

c9x0cxw0  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(312)

这里没有!我正在努力学习java中的ApacheBeam,但是没有任何进展!假设我有一个这样格式的文件:

957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Kyle,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019

我想知道如何计算包含“accounts”的行中的人数并获得以下输出:

Kyle=2
Kumiko=1
pqwbnv8z

pqwbnv8z1#

如果要在java中使用apache beam,请首先在pom.xml文件中添加以下两个依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.25.0</version>
</dependency>

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.25.0</version>
</dependency>

然后,为了达到你想要的输出,你可以这样做(这不是完美的,但给你的想法)

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p
        .apply(TextIO.read().from("input.txt"))
        .apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
                .via(l -> Arrays.asList(l.split(","))))
        .apply(Filter.by(element -> element.get(3).equals("Accounts")))
        .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                .via(element -> KV.of(element.get(1), 1)))
        .apply(Combine.perKey((a, b) -> a + b))
        .apply(MapElements.into(TypeDescriptors.strings()).via(element -> element.getKey() + "=" + element.getValue()))
        .apply(TextIO.write().to("data/output").withNumShards(1).withSuffix(".txt"));

p.run().waitUntilFinish();

结果是:

Kyle=2
Kumiko=1

相关问题