如何在apache beam java sdk中对多个列使用aggregatefield()?

mlmc2os5  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(356)

在apache beam python sdk中,可以执行以下操作:

input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')

我们如何在javasdk中执行类似的操作?奇怪的是,《编程指南》中只有python中用于此转换的示例。
下面是我尝试用java生成的等价物:

input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));
idfiyjo8

idfiyjo81#

下面是一些java示例https://beam.apache.org/documentation/programming-guide/#using-架构(注意:您可能需要选择 java 选项卡上同时具有java和python的选择器。)
在java中,我认为aggregatefield的第一个参数不能采用任意表达式;必须是字段名。可以使用为所需表达式添加新字段的投影来继续分组操作。例如

input
    .apply(SqlTransform.query(
        "SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
    .apply(Group.byFieldNames("account")
        .aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));

相关问题