avro中带有morphlines的子记录

oiopk7p5  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(316)

我正在尝试使用kite-sdk-morphline模块将json转换成avro。玩过之后,我可以使用简单的模式(没有复杂的数据类型)将json转换成avro。
然后我进一步修改了avro模式,如下所示(subrec.avsc)。如您所见,模式由一个子记录组成。
当我尝试使用morphlines.conf和subrec.avsc将json转换为avro时,它就失败了。
不知何故,json路径 "/record_type[]/alert/action" 不是由toavro函数翻译的。
这个 morphlines.conf ```
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]

commands : [
# Read the JSON blob
{ readJson: {} }

  { logError { format : "record: {}", args : ["@{}"] } }

  # Extract JSON
  { extractJsonPaths { flatten: false, paths: {
          "/record_type[]/alert/action" : /alert/action,
          "/record_type[]/alert/signature_id" : /alert/signature_id,
          "/record_type[]/alert/signature" : /alert/signature,
          "/record_type[]/alert/category" : /alert/category,
          "/record_type[]/alert/severity" : /alert/severity
  } } }

  { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

  { extractJsonPaths { flatten: false, paths: {
          timestamp : /timestamp,
          event_type : /event_type,
          source_ip : /src_ip,
          source_port : /src_port,
          destination_ip : /dest_ip,
          destination_port : /dest_port,
          protocol : /proto,
  } } }

  # Create Avro according to schema
  { logError { format : "WE GO TO AVRO"} }

  { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } }

  # Create Avro container
  { logError { format : "WE GO TO BINARY"} }
  { writeAvroToByteArray { format: containerlessBinary } }

  { logError { format : "DONE!!!"} }

]
}
]

以及 `subrec.avsc` ```
{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

输出打开 { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } 我输出以下内容:

[{
    /record_type[]/alert / action = [allowed], 
    /record_type[]/alert / category = [],
    /record_type[]/alert / severity = [3],
    /record_type[]/alert / signature = [GeoIP from NL,
    Netherlands],
    /record_type[]/alert / signature_id = [88006],
    _attachment_body = [{
            "timestamp": "2015-03-23T07:42:01.303046",
            "event_type": "alert",
            "src_ip": "1.1.1.1",
            "src_port": 18192,
            "dest_ip": "46.231.41.166",
            "dest_port": 62004,
            "proto": "TCP",
            "alert": {
                "action": "allowed",
                "gid": "1",
                "signature_id": "88006",
                "rev": "1",
                "signature" : "GeoIP from NL, Netherlands ",
                "category" : ""
                "severity" : "3"
                }
            }], 
    _attachment_mimetype=[json/java + memory],
    basename = [simple_eve.json]
}]
q35jwt9p

q35jwt9p1#

更新2017-06-22
必须使用addvalues或setvalues填充结构中的数据才能使其工作

{
    addValues {
        micDefaultHeader : [
            {
                eventTimestampString : "2017-06-22 18:18:36"
            }
        ]
    }
}

在调试morphline toavro的源代码之后,不管您在mappings结构中放置了什么,记录似乎都是第一个要求值的对象。
这个解决方案非常简单,但不幸的是需要一些额外的时间,比如eclipse,在调试模式下运行flume代理,克隆源代码和大量的咖啡。
就这样。
我的架构:

{
  "type" : "record",
  "name" : "co_lowbalance_event",
  "namespace" : "co.tigo.billing.cboss.lowBalance",
  "fields" : [ {
    "name" : "dummyValue",
    "type" : "string",
    "default" : "dummy"
  }, {
    "name" : "micDefaultHeader",
    "type" : {
      "type" : "record",
      "name" : "mic_default_header_v_1_0",
      "namespace" : "com.millicom.schemas.root.struct",
      "doc" : "standard millicom header definition",
      "fields" : [ {
        "name" : "eventTimestampString",
        "type" : "string",
        "default" : "12345678910"
      } ]
    }
  } ]
}

变形线文件:

morphlines : [
        {
                id : convertJsonToAvro
                importCommands : ["org.kitesdk.**"]
                commands : [
                        {
                                readJson {
                                        outputClass : java.util.Map
                                }
                        }

                        {
                                addValues {
                                   micDefaultHeader : [{}]
                                }
                        }

                        {
                                logDebug { format : "my record: {}", args : ["@{}"] } 
                        }

                        {
                                toAvro {
                                        schemaFile : /home/asarubbi/Development/test/co_lowbalance_event.avsc
                                        mappings : {
                                                "micDefaultHeader" : micDefaultHeader
                                                "micDefaultHeader/eventTimestampString" : eventTimestampString
                                        }

                                }
                        }

                        {
                                writeAvroToByteArray {
                                        format : containerlessJSON
                                        codec : null
                                }
                        }
                ]
        }
]

神奇之处在于:

{
   addValues {
      micDefaultHeader : [{}]
   }
}

在Map中:

mappings : {
    "micDefaultHeader" : micDefaultHeader
    "micDefaultHeader/eventTimestampString" : eventTimestampString
}

说明:
在代码中,计算的第一个字段名是record类型的micdefaultheader。由于无法为记录指定默认值(逻辑正确),toavro代码会对此进行计算,不会在Map中获得任何配置的值,因此它会失败(错误地)检测到该记录在不应该为空时为空。
但是,看一下代码,您可能会发现它需要一个map对象,不包含任何值,以便让解析器满意并继续下一个元素。
因此,我们使用addvalues添加一个map对象,并用空map[{}]填充它。请注意,这必须与导致空值的记录的名称匹配。在我的例子中“micdefaultheader”
如果您有更好的解决方案,请随时发表评论,因为这看起来像一个“肮脏的修补程序”

相关问题