apacheflink:如何使用javaMap流(或包含dto的Map)?

0s0u357o  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(273)

我使用的是flink,系统中有一个json字符串流,其中包含动态变化的字段和嵌套字段。因此,我无法将这个传入的json模拟并转换为静态pojo,而只能依赖于Map。
我的第一个转换是使用gson解析将json字符串流转换为map对象流,然后将map Package 到一个名为data的dto中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

当这个转换处理之后,我试图将结果流输入到我的自定义flink接收器时,问题就出现了。调用函数不会在接收器中被调用。但是,如果我从这个包含dto的Map更改为一个基本体或一个没有Map的常规dto,那么sink可以工作。
我的dto看起来像这样:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

在这种情况下,有什么Maven建议我可以用吗?

x9ybnkn6

x9ybnkn61#

我能解决这个问题。在我的flink日志中,我看到一个名为reflectionserializerfactory类的kryo文件没有找到。我在maven中更新了kryo版本,并为我的Map使用了flink文档中说flink支持的Map类型。
只需确保在代码中指定泛型类型,并在pojo for maps中添加getter和setter。
我还使用.returns(xyz.class)类型删除来避免类型擦除的影响。

相关问题