kafkaconnect:如何从struct获取嵌套字段

bfrts1fy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(471)

我正在使用kafka connect实现kafka elasticsearch连接器。
生产者发送了一个复杂的json到一个kafka主题,我的连接器代码将使用它来持久化ElasticSearch。连接器以struct的形式获取数据(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/struct.html).
我能够在顶级json上获取struct的字段值,但不能从嵌套的json获取。

{
    "after": {
        "test.test.employee.Value": {
            "id": 5671111,
            "name": {
                "string": "abc"
            }
        }
    },
    "op": "u",
    "ts_ms": {
        "long": 1474892835943
    }
}

我可以解析“op”,但不能解析“test.test.employee.value”。

Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u". 

Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name
cnwbcb6i

cnwbcb6i1#

Struct.getStruct 本机不支持使用点表示法的嵌套。
似乎您的模式可能来自debezium,在这种情况下,它们有自己的“展开”消息转换器。
一种选择是,如果您控制着这个提取器代码,您可能会发现我为confluent kafka connect存储项目编写的代码很有用。它需要一个结构或Map对象(见下文)
否则,您可能需要尝试将landoop提供的kql插件添加到connect类路径中。

public static Object getNestedFieldValue(Object structOrMap, String fieldName) {
    // validate(structOrMap, fieldName); // can ignore this

    try {
      Object innermost = structOrMap;
      // Iterate down to final struct
      for (String name : fieldName.split("\\.")) {
        innermost = getField(innermost, name);
      }
      return innermost;
    } catch (DataException e) {
      throw new DataException(
            String.format("The field '%s' does not exist in %s.", fieldName, structOrMap),
            e
      );
    }
  }

  public static Object getField(Object structOrMap, String fieldName) {
    // validate(structOrMap, fieldName);

    Object field;
    if (structOrMap instanceof Struct) {
      field = ((Struct) structOrMap).get(fieldName);
    } else if (structOrMap instanceof Map) {
      field = ((Map<?, ?>) structOrMap).get(fieldName);
      if (field == null) {
        throw new DataException(String.format("Unable to find nested field '%s'", fieldName));
      }
      return field;
    } else {
      throw new DataException(String.format(
            "Argument not a Struct or Map. Cannot get field '%s' from %s.",
            fieldName,
            structOrMap
      ));
    }
    if (field == null) {
      throw new DataException(
            String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
    }
    return field;
  }

相关问题