RabbitMQ其他知识点 -- 幂等性、优先级队列和惰性队列

x33g5p2x  于2022-01-04 转载在 RabbitMQ  
字(4.0k)|赞(0)|评价(0)|浏览(314)

1、RabbitMQ其他知识点

1.1、幂等性

1.1.1、概念

  • 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用

1.1.2、消费者重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

1.1.3、解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id每次消费消息时用该 id 先判断该消息是否已消费过

1.1.4、消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:

  1. 唯一 ID+指纹码机制,利用数据库主键去重:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
  2. 利用 redis 的原子性:利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。

1.2、优先级队列

1.2.1、使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级

1.2.2、图片说明

  • /左边的是编号,右边的是优先级
  • 所以优先级队列会先按照优先级大小进行重新排队,越大的越先被处理。传统的队列是先进先出

1.2.3、如何添加

1. 页面中如何添加

2. 代码形式如何添加

1. 生产者代码

package com.xiao.helloworld;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置IP地址
        factory.setHost("192.168.123.129");
        // 用户名
        factory.setUsername("admin");
        // 密码
        factory.setPassword("123");

        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        /** * 声明队列 * 1.队列名称 * 2.队列里面的消息是否持久化(磁盘)默认情况下消息存储在内存中 * 3.该队列是否提供一个消费者进行消费,就是是否进行消息共享 * 4.就是当最后一个消费者断开连接之后,该队列是否自动删除消息 * 5.其他参数 */
        Map<String,Object> map = new HashMap<>();
        map.put("x-max-priority",10);
        channel.queueDeclare(QUEUE_NAME,true,false,false,map);
        // 发消息
        for (int i = 0; i < 10; i++) {
            String message = "info" + i;
            if(message.equals("info5")){
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().priority(5).build();
                channel.basicPublish("",QUEUE_NAME,basicProperties,message.getBytes(StandardCharsets.UTF_8));
            }else{
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            }
        }

        /** * 发送一个消息 * 1.发送到哪个交换机 * 2.路由的Key值,也就是本次队列的名称 * 3.其他参数信息 * 4.发送消息的消息内容 */

        System.out.println("消息发送完毕");
    }
}

2. 消费者代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.123.129");
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println(new String(var2.getBody()));
        };

        CancelCallback cancelCallback = var1->{
            System.out.println("消费消息被中断");
        };
        /** * 接收消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

3. 测试结果

  • 消息内容为info5的消息最先被消费。
  • 注意事项队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才能去消费,因为这样才有机会对消息进行排序

1.3、惰性队列

1.3.1、概念和使用场景

  • 惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。
  • 当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了

1.3.2、两种模式

  • defaultlazy

lazy模式的网页形式配置

lazy模式的代码形式配置

Map<String,Object> map = new HashMap<>();
map.put("x-queue-mode","lazy");
channel.queueDeclare(QUEUE_NAME,true,false,false,map);

1.3.3、内存开销对比

  • 在发送 1 百万条消息,每条消息大概占 1KB的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB

相关文章