Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册

x33g5p2x  于2021-12-21 转载在 其他  
字(17.1k)|赞(0)|评价(0)|浏览(671)

本文基于seata 1.3.0版本

ServerOnRequestProcessor是TC用于处理全局或者分支事务请求的处理器,它主要处理八种类型的请求:

  1. 分支事务注册请求
  2. 分支状态报告请求
  3. 全局事务开启请求
  4. 全局事务提交请求
  5. 全局事务锁状态查询请求
  6. 全局事务报告请求
  7. 全局事务回滚请求
  8. 全局事务状态查询请求

因为涉及内容较多,本文只介绍全局事务开启和分支事务注册的处理逻辑,其他的内容在以后的文章说明。

一、准备–请求对象是如何转发的

本小节介绍的是在真正处理开始前请求的转发流程,处理每个请求时,转发流程都是一样的,后面在介绍请求处理时,将不再介绍转发内容。
在文章《Seata解析-seata核心类NettyRemotingServer详解》里面已经介绍了请求消息最终是转发到ServerOnRequestProcessor的onRequestMessage方法。下面再来看一下这个方法:

//代码有删减
	private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        //获取RpcContext上下文对象
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        //MergedWarpMessage表示当前的消息是一个复合消息,里面包含了多个消息,需要对每个消息进行遍历
        if (message instanceof MergedWarpMessage) {
            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
            for (int i = 0; i < results.length; i++) {
                final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
                //调用协调器DefaultCoordinator处理
                results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results);
            //异步发送响应消息
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
        } else {
            final AbstractMessage msg = (AbstractMessage) message;
            //调用协调器DefaultCoordinator处理
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            //异步发送响应消息
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }

从onRequestMessage方法中可以看到,ServerOnRequestProcessor又将这八种类型的消息转发给了transactionMessageHandler。transactionMessageHandler其实在TC启动的时候,在Server类中设置的:

DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();
        //在下面这行代码中设置transactionMessageHandler=coordinator 
        nettyRemotingServer.setHandler(coordinator);

从上面代码可以看到ServerOnRequestProcessor又将请求转发给了协调器DefaultCoordinator对象,下面是协调器的onRequest方法:

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        //设置处理器,设置每个请求对象的hadler属性为协调器对象
        transactionRequest.setTCInboundHandler(this);
        //调用请求对象的handle方法
        return transactionRequest.handle(context);
    }

在onRequest方法里面,又去调用了每个事务请求对象的handle方法,而每个请求对象的handle方法都是相同的:

public AbstractTransactionResponse handle(RpcContext rpcContext) {
		//handler对象其实是协调器DefaultCoordinator,这里又去调用了协调器的handle方法
		//DefaultCoordinator对handler方法做了重载,根据事务请求对象的不同调用的方法不同
        return handler.handle(this, rpcContext);
    }

TC的请求转发相当绕,我觉得这个代码结构需要优化,在协调器中直接调用对应的协调器的handle方法即可,没有必要经请求对象做一次转发,而且将处理方法写在请求对象中,也使得请求对象承担了更多的职责,违反了单一职责原理。
下面使用图来展示请求的对象的转发流程:

对于每个请求,最终是由协调器的handle方法处理,协调器提供了8个重载的handle方法,分别对应上面的八种请求。

二、全局事务开启请求

全局事务开启请求的消息类型是MessageType.TYPE_GLOBAL_BEGIN,请求对象为GlobalBeginRequest,该请求由TM发出。全局事务是由@GlobalTransactional注解的方法开启的,也就是当执行@GlobalTransactional注解的方法时,TM先发送GlobalBeginRequest请求到TC来开启全局事务。
全局事务开启请求的处理,总的来说分为两部分内容:一是创建GlobalSession对象,该对象的创建意味着全局事务的开启,GlobalSession记录了应用名,XID,事务id,全局事务状态等内容,二是生成XID,并设置GlobalSession在事务状态为开启,将XID返回至TM。
下面来看具体实现,首先是协调器中处理该请求的handle方法:

public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        //使用模板方法
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
    }
    public <T extends AbstractTransactionRequest, S extends AbstractTransactionResponse> void exceptionHandleTemplate(Callback<T, S> callback, T request, S response) {
        try {
            callback.execute(request, response);
            callback.onSuccess(request, response);
        } catch (TransactionException tex) {
            LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
            callback.onTransactionException(request, response, tex);
        } catch (RuntimeException rex) {
            LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
            callback.onException(request, response, rex);
        }
    }

在handle()方法里面使用了模板方法,这里我们只需要关注模板方法里面的execute的处理逻辑,其他的方法都是非常简单,不再做介绍。
我们看到execute又调用了doGlobalBegin方法:

protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
        //这里的core是DefaultCore对象,DefaultCore的begin方法返回事务XID
        //之后response对象携带XID返回给TM,TM收到XID后表示全局事务开启成功
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }

下面是DefaultCore对象的begin方法:

//name:默认是应用发起全局事务的方法名,可以通过@GlobalTransactional(name="beginTransaction")修改该值
    //applicationId:发起全局事务的应用名
    //transactionServiceGroup:发起全局事务的事务分组名
    //timeout:事务超时时间,默认300s,可以通过@GlobalTransactional(timeoutMills = 3000)设置
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
		//创建全局会话,在seata中事务称为会话
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        //设置监听器,监听器的作用主要是写事务日志,关于监听器后续单独文章介绍
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        //开启事务
        session.begin();
        // transaction start event
        // 发布全局事务开启事件,这里用到了Guava里面的EventBus工具类,这个后面单独文章介绍
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
        //返回XID
        return session.getXid();
    }

上面代码用到了GlobalSession里面的两个方法:createGlobalSession和begin,下面分别来看一下:

public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,
                                                    int timeout) {
        //GlobalSession的构造方法在下面
        GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;
        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
        //XID的组成=IP:port:transactionId
        this.xid = XID.generateXID(transactionId);
    }
    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        //设置事务开启时间
        this.beginTime = System.currentTimeMillis();
        //事务为激活状态,直到GlobalSession对象关闭
        this.active = true;
        //调用监听器,后续单独文章分析
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }

GlobalSession的这几个方法还是比较简单的,直接进行的赋值操作。
在GlobalSession的构造方法中可以看到,XID的组成:TC的IP:TC的监听端口:UUID,begin方法表示事务开启,可以看到事务开启是设置了会话的状态为GlobalStatus.Begin,并且设置了会话开始时间。
创建好GlobalSession对象,生成XID后,便将XID返回到TM,到此全局事务开启请求的处理结束。

三、分支事务注册请求

分支事务注册请求的消息类型是MessageType.TYPE_BRANCH_REGISTER,请求对象为BranchRegisterRequest,该请求由RM发出。该请求用于开启分支事务,数据库的增删改就是在分支事务中进行的。
该请求的处理相对来说比较复杂一些,先用下面的图让大家有个了解:

下面介绍一下代码实现。
处理该请求也是在模板方法中进行的,在模板方法里面直接调用了DefaultCoordinator.doBranchRegister方法,下面直接来看doBranchRegister方法方法:

protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        //分支事务注册,TC生成一个分支ID,并返回到RM
        //下面要调用DefaultCore的branchRegister方法
        response.setBranchId(
            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                request.getXid(), request.getApplicationData(), request.getLockKey()));
    }
    //下面是DefaultCore的branchRegister方法
    /** * @param branchType 分支类型,我们现在使用的是AT模式,branchType为AT * @param resourceId RM连接数据库的URL * @param clientId 客户端ID,组成是应用名:IP:端口 * @param xid the xid * @param applicationData 未知 * @param lockKeys 需要锁定记录的主键值和表,这个非常关键。RM在操作数据库前会解析SQL语句 * 分析出SQL语句要操作的表和主键值,并将这两个值拼成字符串发送到TC */
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
       	//AT模式下branchType为AT,getCore方法返回的是ATCore对象
        return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
            applicationData, lockKeys);
    }

branchRegister方法调用了ATCore的branchRegister方法去注册分支事务:

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        //根据XID找到GlobalSession对象,这个对象是全局事务开启时创建的
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        //1、lockAndExecute方法中要对GlobalSession对象上锁
        return SessionHolder.lockAndExecute(globalSession, () -> {
            //检查事务状态是否为开启和激活状态
            globalSessionStatusCheck(globalSession);
            //添加监听器,后续专门文章解析
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            //创建分支会话BranchSession对象
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);
            //2、对记录进行上锁
            branchSessionLock(globalSession, branchSession);
            try {
                //设置分支事务状态为BranchStatus.Registered,并且将该分支事务添加到GlobalSession中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                branchSessionUnlock(branchSession);
                throw new BranchTransactionException(FailedToAddBranch, String
                        .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                                branchSession.getBranchId()), ex);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                    globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
            }
            //返回分支事务ID
            return branchSession.getBranchId();
        });
    }

这里涉及到两个地方上锁,首先来看第一个SessionHolder.lockAndExecute:

public static <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
            throws TransactionException {
        //这里根会话管理器使用的是FileSessionManager,其lockAndExecute方法见下面
        return getRootSessionManager().lockAndExecute(globalSession, lockCallable);
    }
    public <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
            throws TransactionException {
        //全局事务上锁,这个锁定的是事务自身,如果在全局事务中存在并发分支事务,这里会被拦截
        globalSession.lock();
        try {
        	//调用回调方法
            return lockCallable.call();
        } finally {
            globalSession.unlock();
        }
    }

第一个加锁相对比较简单,就是对全局会话GlobalSession直接加锁,底层使用的是ReentrantLock,如果当前已经被加锁,最长锁等待时间是2000ms。代码如下:

private Lock globalSessionLock = new ReentrantLock();
    private static final int GLOBAL_SESSION_LOCK_TIME_OUT_MILLS = 2 * 1000;
    public void lock() throws TransactionException {
        try {
           if (globalSessionLock.tryLock(GLOBAL_SESSION_LOCK_TIME_OUT_MILLS, TimeUnit.MILLISECONDS)) {
               return;
            }
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted error", e);
    	}
    	throw new GlobalTransactionException(TransactionExceptionCode.FailedLockGlobalTranscation, "Lock global session failed");
	}

下面来看第二个加锁的地方,也就是方法branchSessionLock:

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
		//调用分支会话加锁,如果加锁失败,则抛出异常
        if (!branchSession.lock()) {
            throw new BranchTransactionException(LockKeyConflict, String
                    .format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()));
        }
    }
    //下面是BranchSession的lock方法,从这里可以看到首先获取锁管理器,然后使用锁管理器上锁
    public boolean lock() throws TransactionException {
        if (this.getBranchType().equals(BranchType.AT)) {
            return LockerManagerFactory.getLockManager().acquireLock(this);
        }
        return true;
    }

seata锁管理器与存储模式有关,存储模式可以是db、file、redis,这里使用的是file,对应的类是FileLockManager。下面是FileLockManager的acquireLock方法:

public boolean acquireLock(BranchSession branchSession) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        //得到本分支事务需要加锁的记录和表名,RM将要加锁的记录主键值和表名组装成字符串发送到TC
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            // no lock
            return true;
        }
        // 创建RowLock集合,RowLock对象表示要加锁的表及主键中字段的值,一条加锁记录一个RowLock对象
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        //当前设置存储模式是file,getLocker方法返回的是FileLocker
        return getLocker(branchSession).acquireLock(locks);
    }
    protected List<RowLock> collectRowLocks(BranchSession branchSession) {
        List<RowLock> locks = new ArrayList<>();
        if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {
            return locks;
        }
        String xid = branchSession.getXid();
        //得到资源id,也就是数据库连接URL
        String resourceId = branchSession.getResourceId();
        //得到事务ID,分支事务的transactionId与GlobalSession的transactionId是相同的
        long transactionId = branchSession.getTransactionId();

        String lockKey = branchSession.getLockKey();
        //遍历主键中的每个字段值,对每个字段值创建RowLock对象
        return collectRowLocks(lockKey, resourceId, xid, transactionId, branchSession.getBranchId());
    }
    protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId,
                                            Long branchID) {
        List<RowLock> locks = new ArrayList<RowLock>();
        //当要对多个记录加锁时,中间使用";"分隔
        String[] tableGroupedLockKeys = lockKey.split(";");
        for (String tableGroupedLockKey : tableGroupedLockKeys) {
            //表名和记录主键值之间使用":"分隔
            int idx = tableGroupedLockKey.indexOf(":");
            if (idx < 0) {
                return locks;
            }
            //tableName为要加锁的表名
            String tableName = tableGroupedLockKey.substring(0, idx);
            //mergedPKs为要加锁的记录主键值,如果主键有多个字段,则使用"_"分隔
            //如果需要一次加锁多条记录,则记录之间使用","分隔
            String mergedPKs = tableGroupedLockKey.substring(idx + 1);
            if (StringUtils.isBlank(mergedPKs)) {
                return locks;
            }
            String[] pks = mergedPKs.split(",");
            if (pks == null || pks.length == 0) {
                return locks;
            }
            //遍历主键中的每个字段,创建RowLock对象,每个主键值都使用字符串表示
            for (String pk : pks) {
                if (StringUtils.isNotBlank(pk)) {
                    RowLock rowLock = new RowLock();
                    rowLock.setXid(xid);
                    rowLock.setTransactionId(transactionId);
                    rowLock.setBranchId(branchID);
                    rowLock.setTableName(tableName);
                    rowLock.setPk(pk);
                    rowLock.setResourceId(resourceId);
                    locks.add(rowLock);
                }
            }
        }
        return locks;
    }

上面的collectRowLocks方法主要是遍历需要加锁的记录,加锁的记录存储在BranchSession的lockKey属性中,如果需要对一个表的两条记录加锁,而且该表主键有两个字段,则lockKey的组成是:

表名:主键值_主键值;表名:主键值_主键值

collectRowLocks方法遍历lockKey的每条记录,并且一条记录生成一个RowLock对象。然后调用FileLocker.acquireLock方法对RowLock对象加锁。
acquireLock方法遍历所有的RowLock对象,对记录加锁时,首先查看当前记录是否已经加锁,如果没有加过,则直接记录主键值和事务id,表示对该记录上锁,如果已经加过,则比较本次加锁的事务id与上次的事务id是否一致,如果一致,则直接跳过,如果不一致,意味着加锁失败,接下来会对该分支事务已经加过的锁解锁,然后返回RM加锁失败。
acquireLock里面创建BucketLockMap对象记录了加锁的主键值和加锁的事务id的对应关系,表示该记录是由该事务加锁,BucketLockMap可以简单认为是Map对象,然后BucketLockMap会被存储到两个地方:BranchSession的lockHolder属性和LOCK_MAP。
lockHolder属性是一个Map,key是BucketLockMap对象,value是一个集合,存储了数据库的主键值,凡是记录到该集合中就表示本分支事务对这些记录加了锁,释放锁的时候就根据value集合从BucketLockMap对象中删除记录。
下面重点介绍LOCK_MAP。
LOCK_MAP是一个全局的锁集合,记录了目前所有事务正在加的锁,判断记录是否已经加锁以及加锁的事务id就是通过LOCK_MAP完成的,它的类型如下:

ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>>>

结构是:数据库资源->表名->桶->BucketLockMap。
前面两个比较好理解,BucketLockMap也已经介绍过。
LOCK_MAP将每个数据库表下面分了128个桶,每个桶对应一个BucketLockMap,一条加锁记录分在哪个桶,是根据主键值的hash值模128得到的,如果分到某个桶下,就将主键值和事务id添加到桶对应的BucketLockMap中,所以BucketLockMap里面其实存储了不同事务的加锁记录,下面来看一下具体实现:

public boolean acquireLock(List<RowLock> rowLocks) {
        //如果集合为空,表示没有要加锁的记录
        if (CollectionUtils.isEmpty(rowLocks)) {
            //no lock
            return true;
        }
        String resourceId = branchSession.getResourceId();
        long transactionId = branchSession.getTransactionId();
        //bucketHolder:BucketLockMap->主键值集合,表示当前分支事务有哪些记录已经加锁了
        //在branchSession中记录lockHolder是为了在释放锁的时候使用
        //BucketLockMap是一个Map对象,是记录主键值与transactionId的对应关系
        ConcurrentMap<BucketLockMap, Set<String>> bucketHolder = branchSession.getLockHolder();
        //dbLockMap:表名->桶->BucketLockMap,
        //LOCK_MAP:数据库资源->表名->桶->BucketLockMap,
        //LOCK_MAP记录了全局所有事务的所有加锁记录
        ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>> dbLockMap = LOCK_MAP.get(resourceId);
        if (dbLockMap == null) {
            LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<>());
            dbLockMap = LOCK_MAP.get(resourceId);
        }
        //遍历每个需要加锁的记录
        for (RowLock lock : rowLocks) {
            String tableName = lock.getTableName();
            //主键值
            String pk = lock.getPk();
            ConcurrentMap<Integer, BucketLockMap> tableLockMap = dbLockMap.get(tableName);
            if (tableLockMap == null) {
                dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<>());
                tableLockMap = dbLockMap.get(tableName);
            }
            //得到桶的值,桶的个数是128个
            int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
            BucketLockMap bucketLockMap = tableLockMap.get(bucketId);
            if (bucketLockMap == null) {
                tableLockMap.putIfAbsent(bucketId, new BucketLockMap());
                bucketLockMap = tableLockMap.get(bucketId);
            }
            //previousLockTransactionId为之前的事务id
            Long previousLockTransactionId = bucketLockMap.get().putIfAbsent(pk, transactionId);
            //下面分为三种情况,
            // 一是当前加锁的事务与之前已经加过锁的事务是同一个,
            // 二是两个事务不是同一个,
            // 三是之前还没有加过锁
            if (previousLockTransactionId == null) {
                //No existing lock, and now locked by myself
                Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
                if (keysInHolder == null) {
                    bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<>());
                    keysInHolder = bucketHolder.get(bucketLockMap);
                }
                //如果该记录没有加过锁,则直接将记录保存到集合中
                keysInHolder.add(pk);
            } else if (previousLockTransactionId == transactionId) {
                //如果加锁的事务是同一个,则跳过加锁
                // Locked by me before
                continue;
            } else {
                LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockTransactionId);
                try {
                    //如果要加锁的记录已经被其他事务加过锁,则释放当前分支事务已经加过的锁,并返回加锁失败
                    // Release all acquired locks.
                    branchSession.unlock();
                } catch (TransactionException e) {
                    throw new FrameworkException(e);
                }
                return false;
            }
        }
        return true;
    }

相关文章