用hcfs读取json换行文件

n6lpvg4x  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(382)

我怎样才能让flink hcfs连接器以这样的模式从google云存储中读取 **/*S0.json ,其中文件包含换行分隔的json数据?
这些文件包含如下内容

{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}

在gcs ui中,它如下所示:

根据Flink的模式使用gcs文件进行跟踪

wribegjk

wribegjk1#

以纯文本形式从hcfs读取json文件后,可以将其Map到 JSONObject 使用自定义Map器:

import org.apache.flink.api.java.DataSet;
import org.apache.sling.commons.json.JSONObject;

DataSet<JSONObject> jsonInput = 
    input
        .map(record -> record.f1.toString())
        .map(StringToJsonObject::new);
``` `JSONObject` 基于上面链接的示例的Map器代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;

public class StringToJsonObject implements MapFunction<String, JSONObject> {
private static final long serialVersionUID = 4573928723585302447L;

public JSONObject map(String content) throws Exception {
    return new JSONObject(content);
}

}

如果需要,你可以绘制Map `String` 改为pojo而不是generic `JSONObject` 使用与此类似的Map器。

相关问题