在DebeziumSQLServerSourceConnector中有没有一种方法可以将关键字字段值转换为小写?

6qftjkof  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(385)

关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。

9个月前关门了。
改进这个问题
我想将SQLServer列名转换为小写,同时将其存储在kafka主题中。我使用debezium作为我的源连接器

kdfy810k

kdfy810k1#

这可以通过使用jeremycustenborder的kafka-connect-common转换来完成
sql server表:

Id  Name Description  Weight Pro_Id
101 aaa  Sample_Test  3.14   2020-02-21 13:32:06.5900000
102 eee  testdata1    3.14   2020-02-21 13:32:06.5900000

步骤1:从这个链接下载jeremy custenborder在confluent hub中提供的kafka connect公共转换jar文件
步骤2:根据您的kafka环境,将jar文件放在/usr/share/java或/kafka/libs中
步骤3:创建debezium sql server源连接器

{
  "name": "sqlserver_src_connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.server.name": "sqlserver",
    "database.hostname": "*.*.*.*",
    "database.port": "1433",
    "database.user": "username",
    "database.password": "password",
    "database.dbname": "db_name",
    "table.whitelist": "dbo.tablename",
    "transforms": "unwrap,changeCase",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value",
    "transforms.changeCase.from" : "UPPER_UNDERSCORE",
    "transforms.changeCase.to" : "LOWER_UNDERSCORE",
    "database.history.kafka.bootstrap.servers": "*.*.*.*",
    "database.history.kafka.topic": "schema-changes-tablename"
  }
}

第四步:Kafka主题数据

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "description"
      },
      {
        "type": "double",
        "optional": true,
        "field": "weight"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.NanoTimestamp",
        "version": 1,
        "field": "pro_id"
      }
    ],
    "optional": true,
    "name": "sqlserver.dbo.tablename"
  },
  "payload": {
    "id": 101,
    "name": "aaa",
    "description": "Sample_Test",
    "weight": 3.14,
    "pro_id": 1582291926590000000
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "description"
      },
      {
        "type": "double",
        "optional": true,
        "field": "weight"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.NanoTimestamp",
        "version": 1,
        "field": "pro_id"
      }
    ],
    "optional": true,
    "name": "sqlserver.dbo.tablename"
  },
  "payload": {
    "id": 102,
    "name": "eee",
    "description": "testdata1",
    "weight": 3.14,
    "pro_id": 1582291926590000000
  }
}

感谢来自debezium社区的jiri pechanec和chris cranford@naros的帮助

相关问题