我正在尝试使用kite-sdk-morphline模块将json转换成avro。玩过之后,我可以使用简单的模式(没有复杂的数据类型)将json转换成avro。
然后我进一步修改了avro模式,如下所示(subrec.avsc)。如您所见,模式由一个子记录组成。
当我尝试使用morphlines.conf和subrec.avsc将json转换为avro时,它就失败了。
不知何故,json路径 "/record_type[]/alert/action"
不是由toavro函数翻译的。
这个 morphlines.conf
```
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
# Read the JSON blob
{ readJson: {} }
{ logError { format : "record: {}", args : ["@{}"] } }
# Extract JSON
{ extractJsonPaths { flatten: false, paths: {
"/record_type[]/alert/action" : /alert/action,
"/record_type[]/alert/signature_id" : /alert/signature_id,
"/record_type[]/alert/signature" : /alert/signature,
"/record_type[]/alert/category" : /alert/category,
"/record_type[]/alert/severity" : /alert/severity
} } }
{ logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
{ extractJsonPaths { flatten: false, paths: {
timestamp : /timestamp,
event_type : /event_type,
source_ip : /src_ip,
source_port : /src_port,
destination_ip : /dest_ip,
destination_port : /dest_port,
protocol : /proto,
} } }
# Create Avro according to schema
{ logError { format : "WE GO TO AVRO"} }
{ toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } }
# Create Avro container
{ logError { format : "WE GO TO BINARY"} }
{ writeAvroToByteArray { format: containerlessBinary } }
{ logError { format : "DONE!!!"} }
]
}
]
以及 `subrec.avsc` ```
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "timestamp",
"type" : "string"
}, {
"name" : "event_type",
"type" : "string"
}, {
"name" : "source_ip",
"type" : "string"
}, {
"name" : "source_port",
"type" : "int"
}, {
"name" : "destination_ip",
"type" : "string"
}, {
"name" : "destination_port",
"type" : "int"
}, {
"name" : "protocol",
"type" : "string"
}, {
"name": "record_type",
"type" : ["null", {
"name" : "alert",
"type" : "record",
"fields" : [ {
"name" : "action",
"type" : "string"
}, {
"name" : "signature_id",
"type" : "int"
}, {
"name" : "signature",
"type" : "string"
}, {
"name" : "category",
"type" : "string"
}, {
"name" : "severity",
"type" : "int"
}
] } ]
} ]
}
输出打开 { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }
我输出以下内容:
[{
/record_type[]/alert / action = [allowed],
/record_type[]/alert / category = [],
/record_type[]/alert / severity = [3],
/record_type[]/alert / signature = [GeoIP from NL,
Netherlands],
/record_type[]/alert / signature_id = [88006],
_attachment_body = [{
"timestamp": "2015-03-23T07:42:01.303046",
"event_type": "alert",
"src_ip": "1.1.1.1",
"src_port": 18192,
"dest_ip": "46.231.41.166",
"dest_port": 62004,
"proto": "TCP",
"alert": {
"action": "allowed",
"gid": "1",
"signature_id": "88006",
"rev": "1",
"signature" : "GeoIP from NL, Netherlands ",
"category" : ""
"severity" : "3"
}
}],
_attachment_mimetype=[json/java + memory],
basename = [simple_eve.json]
}]
1条答案
按热度按时间q35jwt9p1#
更新2017-06-22
必须使用addvalues或setvalues填充结构中的数据才能使其工作
在调试morphline toavro的源代码之后,不管您在mappings结构中放置了什么,记录似乎都是第一个要求值的对象。
这个解决方案非常简单,但不幸的是需要一些额外的时间,比如eclipse,在调试模式下运行flume代理,克隆源代码和大量的咖啡。
就这样。
我的架构:
变形线文件:
神奇之处在于:
在Map中:
说明:
在代码中,计算的第一个字段名是record类型的micdefaultheader。由于无法为记录指定默认值(逻辑正确),toavro代码会对此进行计算,不会在Map中获得任何配置的值,因此它会失败(错误地)检测到该记录在不应该为空时为空。
但是,看一下代码,您可能会发现它需要一个map对象,不包含任何值,以便让解析器满意并继续下一个元素。
因此,我们使用addvalues添加一个map对象,并用空map[{}]填充它。请注意,这必须与导致空值的记录的名称匹配。在我的例子中“micdefaultheader”
如果您有更好的解决方案,请随时发表评论,因为这看起来像一个“肮脏的修补程序”