springboot:整合rabbitmq之快速入门

x33g5p2x  于2022-06-16 转载在 Spring  
字(6.9k)|赞(0)|评价(0)|浏览(323)

springboot:整合rabbitmq之快速入门

一、依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

二、配置类

package com.yolo.springbootrabbitmqproducer.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(1);  //设置线程数
        factory.setMaxConcurrentConsumers(1); //最大线程数
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

三、基本消息模型

消息发布和监听

@RestController
public class ProducerTestOneController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
        }
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "helloWorld"
    )
    })
    public void receive(String message) {
        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
    }
}

启动项目查看rabbitmq可视化界面

Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和

发送消息测试

访问:http://localhost:8080/send

这里采用的是自动ack机制

新增一个接受者

@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "helloWorld"
    )
    })
    public void receive2(Message message) {
        System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'");
    }

重新启动,访问:localhost:8080/send

可以看到消息被平均消费了

四、竞争消费者模式

队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…

上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间

@RestController
public class ProducerTestOneController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
        }
    }

    private int count1=1;
    private int count2=1;
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "helloWorld"
    )
    })
    public void receive(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "helloWorld"
    )
    })
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }
}

现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。

针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。

方式一(局部)

这里需要每一个接受者指定containerFactory

@Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(1);  //设置线程数
        factory.setMaxConcurrentConsumers(1); //最大线程数
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

1号处理了8条消息,2号2条,工作效率提高了不少

方式二(全局)

这里接受者不需要指定containerFactory

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        prefetch: 1

相关文章