我正在从一个文件中获取一些json记录。我想解析json,然后基于json中的一个字段,更新bucketing函数基路径。
例如:json记录中有一个字段名'user id',基于此,我想将我的基本路径更新为bucketingsink(“/data/app/users/”+user id field value+“/”)
我该怎么做?
代码:datastream input=env.readtextfile(“/home/user/desktop/jsonfile”);
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}
1条答案
按热度按时间oxalkeyp1#
使用bucketingsink.setbucketer()方法设置您创建的实现bucketer接口的类,并使用
user-id
字段值作为子存储桶路径。