canal 可以增量添加,增量删除,无法增量更新

qlfbtfca  于 2022-10-24  发布在  其他
关注(0)|答案(8)|浏览(399)

canal版本:1.1.5,非alpha版本
数据库:mysql5.6
日志:
DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"code":"test","name":"测试111","icon_uri":""}],"database":"new_sdk_game","destination":"example","es":1618922249000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"","table":"small_icon","ts":1618922249492,"type":"UPDATE"}
Affected indexes: small_icon
配置文件:

不知道是不是和改了这个有关 #3144

yqkkidmi

yqkkidmi1#

看了下,和binlog_row_image='MINIMAL'有关,希望后续可以支持这个,现在我只能改下源码了

wdebmtf2

wdebmtf22#

可以修改如下源码

第一处:
else if (eventType == CanalEntry.EventType.UPDATE) { columns = rowData.getAfterColumnsList(); if (rowData.getBeforeColumnsCount() == 1) { //当binlog的binlog_row_image为MINIMAL时,前镜像只记录主键,后镜像只记录修改的列,需要把主键加进来 CanalEntry.Column pkColumn = rowData.getBeforeColumns(0); List<CanalEntry.Column> list = new ArrayList<>(columns.size() + 1); list.add(pkColumn); list.addAll(columns); columns = list; } }

第二处:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); if (rowData.getBeforeColumnsCount() == 1) { beforeColumnsList = columns; } for (CanalEntry.Column column : beforeColumnsList) { if (updateSet.contains(column.getName())) { if (column.getIsNull()) { rowOld.put(column.getName(), null); } else { rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(msg.getTable(), column.getName(), rowData.getBeforeColumnsCount() == 1 ? "unknown" : column.getValue(), column.getSqlType(), column.getMysqlType())); } } }

另外,如果mysql字段含有关键字的话,比如select group from test。这时候可以增量更新,但是不能全量迁移,全量迁移需要select group from test,但是这样子就不能增量更新了,只要起个别名就既可以增量更新,也可以全量迁移了-->select t.group from test

a9wyjsp7

a9wyjsp73#

看了下,和binlog_row_image='MINIMAL'有关,希望后续可以支持这个,现在我只能改下源码了
这个参数在canal.properties里面不是可以修改吗?

z8dt9xmd

z8dt9xmd4#

看了下,和binlog_row_image='MINIMAL'有关,希望后续可以支持这个,现在我只能改下源码了
这个参数在canal.properties里面不是可以修改吗?

canal.properties不用改,默认已经支持MINIMAL,只是canal_adapter接收的后置镜像只有修改的列,没有主键,所以找不到es文档

xfyts7mz

xfyts7mz5#

看了下,和binlog_row_image='MINIMAL'有关,希望后续可以支持这个,现在我只能改下源码了
这个参数在canal.properties里面不是可以修改吗?

canal.properties不用改,默认已经支持MINIMAL,只是canal_adapter接收的后置镜像只有修改的列,没有主键,所以找不到es文档
你们那边binlog_row_image的格式还是MINIMAL,然后通过修改源码解决的?
我这边遇到类似的问题,在写入目标MySQL时无法更新数据,追踪源码发现是我们的binlog日志中old参数为null,导致无法更新,排查了源库的binlog的相关设置均符合官方设置,可能存在其他的设置问题,最后的处理方案是把源码中的update函数调整为对所有列修改,而非修改了数据的那几列。

0tdrvxhp

0tdrvxhp6#

修改类com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService的update方法。
`
/**

  • 更新操作
  • @param config 配置项
  • @param dml DML数据
    */
    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
    Map<String, Object> data = dml.getData();
    if (data == null || data.isEmpty()) {
    return;
    }
    //这里先设置了插入,根据需求,可以自行选择是否需要。
    insert(batchExecutor,config,dml);
    //将old的判断去除。
    // Map<String, Object> old = dml.getOld();
    // if (old == null || old.isEmpty()) {
    // return;
    // }
    //将下方代码中的old变量调整为传过来的data.核心思想就是根据data解析字段为目标字段,并对目标库中的所有字段进行更新。
    DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

    Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

    StringBuilder updateSql = new StringBuilder();
    updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");

// Set columnsSet = data.keySet();
List<Map<String, ?>> values = new ArrayList<>();
boolean hasMatched = false;
// for (String srcColumnName : old.keySet()) {
for (String srcColumnName : data.keySet()) {
List targetColumnNames = new ArrayList<>();
columnsMap.forEach((targetColumn, srcColumn) -> {
if (srcColumnName.equalsIgnoreCase(srcColumn)) {
targetColumnNames.add(targetColumn);
}
});
if (!targetColumnNames.isEmpty()) {
hasMatched = true;
for (String targetColumnName : targetColumnNames) {
updateSql.append(" ").append(targetColumnName).append(" ").append("=?, ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
BatchExecutor.setValue(values, type, data.get(srcColumnName));
}
}
}
if (!hasMatched) {
logger.warn("Did not matched any columns to update ");
return;
}
int len = updateSql.length();
updateSql.delete(len - 2, len).append(" WHERE ");

// 拼接主键

// appendCondition(dbMapping, updateSql, ctype, values, data, old);
appendCondition(dbMapping, updateSql, ctype, values, data, data);
batchExecutor.execute(updateSql.toString(), values);
if (logger.isTraceEnabled()) {
logger.trace("Update target table, sql: {}", updateSql);
}
}
`

yfwxisqw

yfwxisqw7#

看了下,和binlog_row_image='MINIMAL'有关,希望后续可以支持这个,现在我只能改下源码了
这个参数在canal.properties里面不是可以修改吗?

canal.properties不用改,默认已经支持MINIMAL,只是canal_adapter接收的后置镜像只有修改的列,没有主键,所以找不到es文档
你们那边binlog_row_image的格式还是MINIMAL,然后通过修改源码解决的?
我这边遇到类似的问题,在写入目标MySQL时无法更新数据,追踪源码发现是我们的binlog日志中old参数为null,导致无法更新,排查了源库的binlog的相关设置均符合官方设置,可能存在其他的设置问题,最后的处理方案是把源码中的update函数调整为对所有列修改,而非修改了数据的那几列。

嗯,同步到mysql的我还没试过,可能有些不同

qyyhg6bp

qyyhg6bp8#

遇到同样的问题了 就更新出错其他的没问题

相关问题