我正在使用kafka connect实现kafka elasticsearch连接器。
生产者发送了一个复杂的json到一个kafka主题,我的连接器代码将使用它来持久化ElasticSearch。连接器以struct的形式获取数据(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/struct.html).
我能够在顶级json上获取struct的字段值,但不能从嵌套的json获取。
{
"after": {
"test.test.employee.Value": {
"id": 5671111,
"name": {
"string": "abc"
}
}
},
"op": "u",
"ts_ms": {
"long": 1474892835943
}
}
我可以解析“op”,但不能解析“test.test.employee.value”。
Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u".
Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name
1条答案
按热度按时间cnwbcb6i1#
Struct.getStruct
本机不支持使用点表示法的嵌套。似乎您的模式可能来自debezium,在这种情况下,它们有自己的“展开”消息转换器。
一种选择是,如果您控制着这个提取器代码,您可能会发现我为confluent kafka connect存储项目编写的代码很有用。它需要一个结构或Map对象(见下文)
否则,您可能需要尝试将landoop提供的kql插件添加到connect类路径中。