如何为bucketingsink函数flink设置动态基路径?

bsxbgnwa  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(381)

我正在从一个文件中获取一些json记录。我想解析json,然后基于json中的一个字段,更新bucketing函数基路径。
例如:json记录中有一个字段名'user id',基于此,我想将我的基本路径更新为bucketingsink(“/data/app/users/”+user id field value+“/”)
我该怎么做?
代码:datastream input=env.readtextfile(“/home/user/desktop/jsonfile”);

DataStream<String> parsedJson = input.map((inputMsg)->{

        String json="";
        try{

            json=jsonParser.parse(inputMsg).getAsString();

        }catch (Exception e){
            e.printStackTrace();
        }
        return json;

    });

   parsedJson.addSink(new BucketingSink<>(""));

}
oxalkeyp

oxalkeyp1#

使用bucketingsink.setbucketer()方法设置您创建的实现bucketer接口的类,并使用 user-id 字段值作为子存储桶路径。

相关问题