合流kafka connect支持复杂或嵌套的json/schema吗

yhqotfr8  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(336)

只能使用confluent kafka connect将简单对象插入数据库。不知道如何使其支持复杂的json/schema结构。我不确定这个功能是否可用。这里有一个类似的问题,大约一年前问过,但到现在还没有回答。请帮忙。

zphenhs4

zphenhs41#

Kafka连接确实支持复杂的结构,包括 Struct , Map ,和 Array . 通常只有源连接器需要这样做,因为接收器连接器被传递值,只需要使用它们。本文档描述了构建 Schema 描述 Struct ,然后创建 Struct 符合该模式的示例。在本例中,示例结构只是一个平面结构。
但是,您可以轻松地添加类型为 Struct 用另一个定义的 Schema 示例。实际上,它只是将这个简单的模式分层到结构中的多个级别:

Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
    .field("number", Schema.INT16_SCHEMA)
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT8_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .field("address", addressSchema)
    .build();

Struct addressStruct = new Struct(addressSchema)
    .put("number", 100)
    .put("street", "Main Street")
    .put("city", "Springfield")
    .build();
Struct personStruct = new Struct(personSchema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .put("address", addressStruct)
    .build();

因为 SchemaBuilder 是一个fluent的api,你可以像定制的那样嵌入它 admin 布尔架构生成器。但这有点难,因为你需要参考 Schema 创建 addressStruct .
一般来说,在编写源连接器时,您只需担心如何做到这一点。如果您试图使用现有的源连接器,那么您可能对键和值的结构几乎没有控制权。例如,confluent的jdbc源连接器用一个单独的 Schema 表中的每一行都是单独的 Struct 使用该模式的。但由于行是平的,所以 Schema 以及 Struct 将仅包含具有基元类型的字段。
debezium的用于mysql和postgresql的cdc连接器还使用 Schema 和对应 Struct 对象,但cdc捕获有关行的更多信息,例如更改前后的行状态。因此,这些连接器使用更复杂的 Schema 对于每个包含嵌套 Struct 物体。
请注意,虽然每个源连接器都有自己独特的消息结构,但kafka connect的单一消息转换(smt)使得在将源连接器生成的消息写入kafka之前对其进行过滤、重命名和稍加修改变得非常容易,或者是从Kafka读取的消息,然后再发送到接收器连接器。

相关问题