Rabbitmq集群,当节点离线时,如何保证离线队列接收到来自扇出交换的消息?

3htmauhk  于 8个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(91)

我有三个节点集群,两个队列绑定到一个扇出交换机。我的要求是,所有发送到交换机的消息都必须保存到这两个队列中,所有消息都不能丢失,必须进行处理。
当其中一个节点离线时,该节点上的队列将丢失交换机收到的消息。我可以使用Quorum Quarum,但这只能让一个节点脱机。如果两个节点都离线,也会出现同样的问题。有什么解决办法吗?

更新时间:(2023-10-03)

我想详细描述一下我的问题,MyFanoutExchange绑定了MyFanoutExchange。现在,所有发送到MyFanoutExchange的消息都将发送到邮箱A和邮箱B,这很好。
| 交换|地位|节点|队列|
| --|--|--|--|
| MyFanoutExchange|在线|rabbitmq-0||
| | 兔MQ-1|队列a| queue-A |
| | 兔MQ-2|队列b| queue-B |
当节点rabbitmq-1离线时,发送到MyFanoutExchange的所有消息都将转到邮箱B,而不是邮箱A,即使rabbitmq-1稍后上线。
| 交换|地位|节点|队列|
| --|--|--|--|
| MyFanoutExchange|在线|rabbitmq-0||
| | 兔MQ-1|队列a| queue-A |
| | 兔MQ-2|队列b| queue-B |
我需要确保客户端A和客户端B可以从MyFanoutExchange接收所有消息。

kognpnkq

kognpnkq1#

您可以只使用一个直接队列。然后,您的3个节点将逐个处理消息,这样会更快。此外,您还可以使用以下标头为错误消息创建接收队列:

async function initBaseQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]:
          FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey + RETRY_CONST,
      },
    });

    await ch.bindQueue(queue, FACTORY_GATEWAY_EXCHANGE, routingKey);

    Logger.log(`Queue ${queue} initialized`);
  }
}

async function initRetryQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE + RETRY_CONST, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue + RETRY_CONST, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]: FACTORY_GATEWAY_EXCHANGE,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey,
        [ERabbitQueueArguments.messageTTL]: REQUEUE_DEALAY_CONST,
      },
    });

    await ch.bindQueue(
      queue + RETRY_CONST,
      FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
      routingKey + RETRY_CONST,
    );

    Logger.log(`Queue ${queue + RETRY_CONST} initialized`);
  }
}

export default async function initRabbitQueues() {
  const conn = await amqplib.connect(
    process.env.RABBIT_HOST ?? 'amqp://localhost:5672',
  );

  const ch1 = await conn.createChannel();

  const hash = queueNames();

  await initBaseQueues(ch1, hash);

  await initRetryQueues(ch1, hash);

  await conn.close();
}

标题为:

export enum ERabbitQueueArguments {
  deadLetterExchange = 'x-dead-letter-exchange',
  deadLetterRoutingKey = 'x-dead-letter-routing-key',
  messageTTL = 'x-message-ttl',
}

相关问题