debezium+schema registry avro schema:为什么我要有“before”和“after”字段,如何将其用于hudideltastreamer?

yhxst69z  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(274)

我在postgresql中有一个表,其模式如下:

Table "public.kc_ds"
 Column |         Type          | Collation | Nullable |              Default              | Storage  | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
 id     | integer               |           | not null | nextval('kc_ds_id_seq'::regclass) | plain    |              |
 num    | integer               |           | not null |                                   | plain    |              |
 text   | character varying(50) |           | not null |                                   | extended |              |
Indexes:
    "kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"

当我为这个使用 io.confluent.connect.avro.AvroConverter 和schema registry,它创建一个schema registry schema,该schema如下所示(这里省略了一些字段):

"fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"num",
                     "type":"int"
                  },
                  {
                     "name":"text",
                     "type":"string"
                  }
               ],
               "connect.name":"xxx.public.kc_ds.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
]

debezium在我的kafka主题中生成的消息如下所示(省略了一些字段):

{
  "before": null,
  "after": {
    "xxx.public.kc_ds.Value": {
      "id": 2,
      "num": 2,
      "text": "text version 1"
    }
}

当我插入或更新时, "before" 总是 null ,和 "after" 包含我的数据;当我删除时,相反的情况成立: "after" 为空且 "before" 包含数据(尽管所有字段都设置为默认值)。
问题1:为什么Kafka会用 "before" 以及 "after" 领域?为什么这些领域的行为如此怪异?
问题#2:有没有一种内置的方法可以让kafka connect在仍然使用schema registry的情况下向我的主题发送平面消息?请注意,展平转换不是我所需要的:如果启用,我仍然会有 "before" 以及 "after" 领域。
问题#3(实际上并不希望有什么,但也许有人知道):平展我的信息的必要性来自于这样一个事实,即我需要使用hudideltastreamer从我的主题中读取数据,而且这个工具似乎需要平展的输入数据。这个 "before" 以及 "after" 字段最终成为结果.parquet文件中类似于列的独立对象。有人知道hudideltastreamer应该如何与kafka connect生成的消息集成吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题