为什么redis流消费者应该在同一个消费者组中指定一个单独的名称?

l2osamch  于 7个月前  发布在  Redis
关注(0)|答案(2)|浏览(90)

下面的代码展示了我如何测试Redis流函数。
而且我发现,同一个消费者名称的不同进程都在竞争消费同一个流中的消息,在我的理解中,如果这种表现是正常的,Redis不应该设计一个函数来指定消费者名称。
是我的理解有问题,还是我用错了方法?

import asyncio
import aioredis

# consumer with name "a", subscribing two streams
async def consume_a():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_a",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

字符串
名为“B”的消费者,订阅两个流

async def consume_b():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_b",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()


运行脚本前创建组

async def config_group_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

async def config_group_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
        print(res)
    finally:
        await redis.close()


生产者

async def produce_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream",
                {"domain_name": "test_domain_name_0", "sid": 0},
                maxlen=5,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

async def produce_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream_1",
                {"domain_name": "test_domain_name_1", "sid": 1},
                maxlen=2,
            )
            print(res)
            i += 1
    finally:
        await redis.close()


测试代码

if __name__ == "__main__":
    # two coroutines consume messages from two streams with the same consumer name
    asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))

iyfamqjs

iyfamqjs1#

消费者可以获得自己的PEL,或者竞争和重复消费PEL

hgncfbus

hgncfbus2#

基于Redis文档:
消费者组的保证之一是,给定的消费者只能看到传递给它的消息的历史,因此消息只有一个所有者。
阅读这些文档以了解更多信息:

相关问题