我有一个通过entity-framework-core(efcore 7)对postgresql数据库执行的命令。该命令旨在“声明”某些记录。每个记录应该声明一次且仅声明一次。
UPDATE command AS c
SET execution_heartbeat = @__GetTimeUtc_2,
claim_id = @__claimId_1
FROM (
SELECT c0.command_id, c0.active, c0.claim_id, c0.execution_heartbeat, c0.sid, c0.started_on, c0.status, c0.tenant_id
FROM command AS c0
WHERE c0.status = 0 AND (c0.claim_id IS NULL)
ORDER BY c0.sid
LIMIT @__p_0 --1 in this case
) AS t
WHERE c.command_id = t.command_id
字符串
上述语句通过以下代码块执行
Guid claimId = Guid.NewGuid();
int affected = 0;
using (var context = DbContextFactory.CreateDbContext())
{
affected = await context.MyTable
.Where(c => c.Status == CommandStatus.QUEUED)
.Where(c => c.ClaimId == null)
.OrderBy(c => c.Sid)
.Take(maxClaims)
.ExecuteUpdateAsync(
setPropertyCalls: calls => calls
.SetProperty(c => c.ClaimId, claimId)
.SetProperty(c => c.ExecutionHeartbeat, TimeAccessor.GetTimeUtc()),
cancellationToken: cancellationToken);
var claimedIds = await context.MyTable
.Where(c => c.ClaimId == claimId)
.Select(c => c.Id)
.ToListAsync(cancellationToken);
this.Logger.LogDebug("Claiming {Count} records: {Ids} with Claim: {ClaimId}", affected, string.Join(", ", claimedIds), claimId);
return claimedIds;
}
型
然而,当有多个线程并行运行此,一些记录最终得到'声称'多次. IE,有两个不同的线程.
- 声明唯一的claimId
- 将某些记录(其claimId应为NULL)更新为新的claimId
- 选择现在具有新claimId的记录
这似乎不太可能,因为我假设update语句是事务性运行的,并且只更新以前没有更新过的记录。
我相信我可以找到一种方法来“围绕这一点编写代码”。但希望有人能阐明这里可能发生的事情,以及我可能做出的错误假设。
1条答案
按热度按时间hgtggwj01#
一个显式的锁可以解决这个问题。不幸的是,EFCore issue for explicit locking仍然是打开的,所以你必须切换到一个原始查询来解决这个问题:
字符串
下面是在9个后台工作人员上复制的案例,他们试图并发地发出您的原始查询:demo1 at db<>fiddle。
下面是另一个演示,展示了如何通过添加显式锁:demo2 at db<>fiddle来修复它。每个人都有一个不同的锁,因为锁定使他们在获取要声明的行时跳过锁定的行。
默认情况下,您使用
read committed
transaction isolation level。这意味着所有并发事务只知道在它们的语句开始之前**声明了 * 什么。您可能对explicit locking感兴趣,或者更改结构,以便有一个表列出谁声明了什么,以及一个unique
约束,防止任何两个声明对同一事物调用dibs。级别
repeatable read
实际上更差,因为事务中的每个语句都可以看到整个事务之前的数据库快照,而read committed
在整个事务中的每个语句之后“刷新”可见性。最后一个,
serializable
将导致任何两个并发冲突的事务失败,你必须在应用程序级别不断捕获并重试。它还带来了db的性能损失-如果你同时运行A和B,PostgreSQL必须确保在B之前运行A与在A之前运行B的结果相同。