kafka elasticsearch连接器抛出版本冲突警告和数据丢失

l2osamch  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(271)

所以我在docker里设置了zookeeper,kafka,postgres,elasticsearch,debezium,kafka connect。问题是一个表没有完全导出到elasticsearch。我正在使用 kafka-connect-elasticsearch 作为Flume连接器。观察kafka connect日志,整个表从源数据库完全导出到kafka,但是几乎有1/3的表没有导出到elasticsearch。日志显示

connect       | 2020-08-07 17:01:16,520 WARN   ||  Ignoring version conflicts for items: [Key{dbserver1.public.api_product/api_product/313019}]   [io.confluent.connect.elasticsearch.jest.JestElasticsearchClient]

各种产品示例的消息不断重复,甚至一个小时后也不会结束。
下面是我的docker配置文件

version: '3.3'
services:
    zookeeper:
        container_name: zookeeper
        ports:
            - '2181:2181'
            - '2888:2888'
            - '3888:3888'
        image: 'debezium/zookeeper:1.2'

    kafka:
        container_name: kafka
        ports:
            - '9092:9092'
        links:
            - 'zookeeper:zookeeper'
        environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
        image: 'debezium/kafka:1.2'

    postgres:
        container_name: postgres
        ports:
            - '5432:5432'
        environment:
            - POSTGRES_USER=pgUser
            - POSTGRES_PASSWORD=pgPassword
            - POSTGRES_DB=pgDB
        image: debezium/postgres:11

    elasticdbz:
        container_name: elasticdbz
        ports:
            - '8881:8881'
            - '9300:9300'
        environment:
           - http.host=0.0.0.0
           - transport.host=127.0.0.1
           - xpack.security.enabled=false
           - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        image: 'docker.elastic.co/elasticsearch/elasticsearch:7.3.0'

    connect:
      container_name: connect
      ports:
          - '8083:8083'
      environment:
          - BOOTSTRAP_SERVERS=kafka:9092
          - GROUP_ID=1
          - CONFIG_STORAGE_TOPIC=my_connect_configs
          - OFFSET_STORAGE_TOPIC=my_connect_offsets
          - STATUS_STORAGE_TOPIC=my_connect_statuses
      links:
          - 'zookeeper:zookeeper'
          - 'kafka:kafka'
          - 'postgres:postgres'
          - 'elasticdbz:elasticdbz'
      image: 'debezium/connect:1.2'

我的源连接器配置:

{
    "name": "my-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "amazon-rds-host*****",
        "database.port": "5432",
        "database.user": "my_user",
        "database.password": "*************",
        "database.dbname" : "dbname",
        "database.server.name": "dbserver1",
        "schema.whitelist": "public",
        "table.whitelist": "public.api_product",
        "plugin.name": "pgoutput"
    }
}

elasticsearch连接器:

{
    "name": "elastic-sink-product",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "dbserver1.public.api_product",
        "connection.url": "http://remote-server-ip",
        "connection.username": "username",
        "connection.password": "**********",
        "behavior.on.null.values": "delete",

        "transforms": "unwrap,key",

        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",

        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id",
        "key.ignore": "false",
        "type.name": "api_product"
    }
}

同时,我的postgres中的另一个表也顺利地导出到elasticsearch。
任何帮助都将非常感谢,因为这是我第一次使用这些服务。
注意:似乎在使用 "write.method": "upsert" 在ElasticSearch连接器配置中修复了这个问题。但我可以看到这种方法是缓慢的,正如文件中提到的。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题