下面的代码展示了我如何测试Redis流函数。
而且我发现,同一个消费者名称的不同进程都在竞争消费同一个流中的消息,在我的理解中,如果这种表现是正常的,Redis不应该设计一个函数来指定消费者名称。
是我的理解有问题,还是我用错了方法?
import asyncio
import aioredis
# consumer with name "a", subscribing two streams
async def consume_a():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_a",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
字符串
名为“B”的消费者,订阅两个流
async def consume_b():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_b",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
型
运行脚本前创建组
async def config_group_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream", "test_consumer_group")
print(res)
finally:
await redis.close()
async def config_group_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
print(res)
finally:
await redis.close()
型
生产者
async def produce_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream",
{"domain_name": "test_domain_name_0", "sid": 0},
maxlen=5,
)
print(res)
i += 1
finally:
await redis.close()
async def produce_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream_1",
{"domain_name": "test_domain_name_1", "sid": 1},
maxlen=2,
)
print(res)
i += 1
finally:
await redis.close()
型
测试代码
if __name__ == "__main__":
# two coroutines consume messages from two streams with the same consumer name
asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))
型
2条答案
按热度按时间iyfamqjs1#
消费者可以获得自己的PEL,或者竞争和重复消费PEL
hgncfbus2#
基于Redis文档:
消费者组的保证之一是,给定的消费者只能看到传递给它的消息的历史,因此消息只有一个所有者。
阅读这些文档以了解更多信息: