springboot:整合rabbitmq之死信队列

x33g5p2x  于2022-06-16 转载在 Spring  
字(6.1k)|赞(0)|评价(0)|浏览(248)

springboot:整合rabbitmq之死信队列

一、项目准备

依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);

//        //设置为手动确认MANUAL(手动),AUTO(自动);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

yaml配置文件

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 5 # 重试次数
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

二、死信队列介绍

创建一个普通队列时,通过添加配置绑定另一个交换机(死信交换机),在普通队列发生异常时,消息就通过死信交换机转发到绑定它的队列里,这个绑定死信交换机的队列就是死信队列

三、案例

创建生产者和消费者

@Slf4j
@RestController
public class DLController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count = 1;

    @GetMapping("/dl")
    public void dl() {
        String message = "Hello World!";
        log.info(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("normal_exchange", "normal_key", message);
    }


    // 监听 normal_queue 正常队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "normal_queue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
                    ,arguments = {
                    @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange"), //指定一下死信交换机
                    @Argument(name = "x-dead-letter-routing-key",value = "dead_key"),  //指定死信交换机的路由key
                    //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
                    //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
            }
            ),
            exchange = @Exchange(value = "normal_exchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "normal_key"
    )
    })
    public void normal(Message message)  {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }

    // 监听死信队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
                    value = @Queue(value = "dlx_queue"),
                    exchange = @Exchange(value = "dlx_exchange"),//Exchang的默认类型就是direct,所以type可以不写
                    key = "dead_key"
            )
    })
    public void dl(Message message) {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }
}

测试结果

服务器上normal-queue有DLX、DLK标识,说明该队列绑定了死信交换机和路由键;
重试5次之后,就将消息转发给死信队列

修改消费者、手动确认

@Slf4j
@RestController
public class DLController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count = 1;

    @GetMapping("/dl")
    public void dl() {
        String message = "Hello World!";
        log.info(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("normal_exchange", "normal_key", message);
    }
    
    // 监听 normal_queue 正常队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "normal_queue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
                    ,arguments = {
                    @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange"), //指定一下死信交换机
                    @Argument(name = "x-dead-letter-routing-key",value = "dead_key"),  //指定死信交换机的路由key
                    //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
                    //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
            }
            ),
            exchange = @Exchange(value = "normal_exchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "normal_key"
    )
    })
    public void normal(Message message, Channel channel) throws IOException {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        try {
            // 制造异常
            int i = 1 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
        } catch (Exception e) {
            log.info("捕获异常,不会启动重试机制,异常消息直接转发到死信队列");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    // 监听死信队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
                    value = @Queue(value = "dlx_queue"),
                    exchange = @Exchange(value = "dlx_exchange"),//Exchang的默认类型就是direct,所以type可以不写
                    key = "dead_key"
            )
    })
    public void dl(Message message) {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }
}

修改配置类

或者修改yaml文件,二选一

重启测试

四、总结

  • 手动确认并且主动捕获了异常是不会触发重试机制,异常消息直接转发到死信队列
  • 死信队列是针对某个队列发生异常时进行处理
  • 重试机制中的RepublishMessageRecoverer是对所有队列发生异常时进行处理,并且优先于死信队列

相关文章