canal 当配置pk,不配置_id的时候,删除mysql数据,ES的这条数据没有被删除掉,只是除了id字段之外的数据被置为null了

xmjla07d  于 2022-10-19  发布在  Mysql
关注(0)|答案(1)|浏览(302)
  • I have searched the issues of this repository and believe that this is not a duplicate.
  • I have checked the FAQ of this repository and believe that this is not a duplicate.

environment

  • canal version 1.1.5-SNAPSHOT
  • mysql version 8.0.22
  • ES version 7.9.2

Issue Description

环境配置:
Mysql同步数据到ES当中,根据官方文档,配置pk,没有配置_id,当mysql删除一条语句的时候,es没有对应的删除这条数据,
看了下es当中,只是把除了pk配置的字段以外的字段的值设置为null

Steps to reproduce

1.配置一个xxx.yml的pk为主键字段,不配置_id
2.启动canal canal-adapter
3.删除mysql的一条数据

Expected behaviour

ES当中的这条数据也被删除

Actual behaviour

但是ES的这条数据并没有被删除,只是除了pk指定的主键字段之外的所有字段的值被设置为null

If there is an exception, please attach the exception trace:

Just put your stack trace here!
xghobddn

xghobddn1#

跟踪了一下代码,发现,是代码有点问题:

com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate类的delete方法:

    @Override
    public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
        //处理配置了_id
        if (mapping.get_id() != null) {
            ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(),
                pkVal.toString());
            //构造的是一个Delete的对象
            getBulk().add(esDeleteRequest);
            commitBulk();
        } else {//处理没有配置_id,但是配置了pk为主键的配置
            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
                .size(10000);
            SearchResponse response = esSearchRequest.getResponse();
            for (SearchHit hit : response.getHits()) {
                ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
                    hit.getId()).setDoc(esFieldData);
                //构造的是一个Update的对象
                getBulk().add(esUpdateRequest);
                commitBulk();
            }
        }
    }

我试验了下,只需要修改Es7xUpdateRequest为Es7xDeleteRequest就可以了!修改如下:

@Override
    public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
        if (mapping.get_id() != null) {
            ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(),
                pkVal.toString());
            getBulk().add(esDeleteRequest);
            commitBulk();
        } else {
            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
                .size(10000);
            SearchResponse response = esSearchRequest.getResponse();
            for (SearchHit hit : response.getHits()) {
                ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(), hit.getId());
                getBulk().add(esDeleteRequest);//修改为ES7xDeleteRequest类型就能删除了
                commitBulk();
            }
        }
    }

查看了下ES6也存在这个问题,一样修改就行了,对于配置_id的没有任何问题

相关问题