springboot:整合rabbitmq之交换机

x33g5p2x  于2022-06-16 转载在 Spring  
字(13.2k)|赞(0)|评价(0)|浏览(231)

springboot:整合rabbitmq之交换机

环境准备

依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);
//        //设置为手动确认MANUAL(手动),AUTO(自动);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

一、交换机

生产者发送消息给交换机,由交换机转发消息给队列,交换机可以转发给所有绑定它的队列,也可以转发给符合路由规则的队列,交换机本身不会存储消息,如果没有绑定任何队列,消息就会丢失

  • 发布订阅模型
    发布订阅使用的交换机是Fanout交换机,也叫广播式交换机
    广播式交换机:fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列
  • 路由模型
    路由式交换机:direct交换器相对来说比较简单,匹配规则为:路由键完全匹配,消息就被投送到相关的队列
  • 主题模型
    主题式交换机:topic交换器采用模糊匹配路由键的原则进行转发消息到队列中
  • 头部订阅
    头部订阅(headers 交换机):headers没有路由键,是根据消息头部header的键值对进行匹配,可以完全匹配也可以匹配任意一对键值对

二、交换机模式

发布订阅

生产者生产10条消息,发送给FanoutExchange,空字符串""表示不需要指定路由键

消费者A监听队列A,消费者B、B2监听队列B

@RestController
public class FanoutExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/fanout")
    public void fanout() {
        for (int i = 1; i <= 10; i++) {
            String message = "Hello World ..." + i;
            rabbitTemplate.convertAndSend("fanout_exchange", "", message);
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        }
    }

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "fanout_a", declare = "true"),
            exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
    )
    })
    public void receiveA(String message) throws InterruptedException {
        System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "fanout_b", declare = "true"),
            exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
    )
    })
    public void receiveB(String message) throws InterruptedException {
        System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "fanout_b", declare = "true"),
            exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
    )
    })
    public void receiveB2(String message) throws InterruptedException {
        System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
        Thread.sleep(500);
        System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
    }
}

运行结果

生产者生产了10条消息发送给交换机,fanout交换机将消息分别转发给绑定它的队列A、B;
队列A、B拿到内容一样的10条消息,队列A只有一个消费者A,所以由消费者A处理所有消息
队列B有两个消费者B、B2,分配模式是按能力分配,所以消费者B处理的多

路由模式

@RestController
public class DirectController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

    @GetMapping("/direct")
    public void direct() {
        for (int i = 1; i <= 30; i++) {
            String message = "Hello World ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            if (i % 3 == 0) {
                rabbitTemplate.convertAndSend("direct_exchange", "direct_a", message);
            } else if (i % 3 == 1) {
                rabbitTemplate.convertAndSend("direct_exchange", "direct_b", message);
            } else {
                rabbitTemplate.convertAndSend("direct_exchange", "direct_b2", message);
            }
        }
    }

    /**
     * name:交换机名称
     * durable:设置是否持久,设置为true则将Exchange存盘,即使服务器重启数据也不会丢失
     * autoDelete:设置是否自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange,简单来说也就是如果该Exchange没有和任何队列Queue绑定则删除
     * internal:设置是否是RabbitMQ内部使用,默认false。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
     * @param message
     * @throws InterruptedException
     */
    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "direct_a", declare = "true"),
            exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
            key = "direct_a"
    )
    })
    public void receiveA(String message) throws InterruptedException {
        System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "direct_b", declare = "true"),
            exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
            key = "direct_b"
    )
    })
    public void receiveB(String message) throws InterruptedException {
        System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "direct_b", declare = "true"),
            exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
            key = "direct_b2"
    )
    })
    public void receiveB2(String message) throws InterruptedException {
        System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
        Thread.sleep(500);
        System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
    }
}

运行结果

两个队列A、B,A有一个路由键,而B有两个路由键;
生产者生产30条消息发送给交换机,交换机根据路由键转发给队列,队列A分配10条消息,队列B分配20条消息;
同时队列D被两个消费者监听

主题模式(通配者模式)

@RestController
public class TopicController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

    @GetMapping("/topic")
    public void topic() {
        for (int i = 1; i <= 30; i++) {
            String message = "Hello World ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            if (i % 3 == 0) {
                // topic.yzm.key,可以匹配 topic.yzm.* 和 topic.#
                rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.key", message);
            } else if (i % 3 == 1) {
                // topic.yzm.yzm,可以匹配 topic.yzm.* 、 topic.# 和 topic.*.yzm
                rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.yzm", message);
            } else {
                // topic.key.yzm,可以匹配 topic.# 和 topic.*.yzm
                rabbitTemplate.convertAndSend("topic_exchange", "topic.key.yzm", message);
            }
        }
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "topic_e", declare = "true"),
            exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
            key = "topic.yzm.*"
    )
    })
    public void receiveE(String message) throws InterruptedException {
        System.out.println(" [ 消费者@E号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@E号 ] Dealt with:" + count1++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "topic_f", declare = "true"),
            exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
            key = "topic.#"
    )
    })
    public void receiveF(String message) throws InterruptedException {
        System.out.println(" [ 消费者@F号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@F号 ] Dealt with:" + count2++);
    }

    @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
            value = @Queue(value = "topic_g", declare = "true"),
            exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
            key = "topic.*.yzm"
    )
    })
    public void receiveG(String message) throws InterruptedException {
        System.out.println(" [ 消费者@G号 ] Received ==> '" + message + "'");
        Thread.sleep(200);
        System.out.println(" [ 消费者@G号 ] Dealt with:" + count3++);
    }
}

分析结果:

E、G二十条,F三十条

头部模式

配置类

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);
//        //设置为手动确认MANUAL(手动),AUTO(自动);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    public static final String QUEUE_X = "headers_x";
    public static final String QUEUE_Y = "headers_y";
    public static final String HEADER_EXCHANGE = "headers_exchange";

    @Bean
    public Queue queueX() {
        return new Queue(QUEUE_X);
    }

    @Bean
    public Queue queueY() {
        return new Queue(QUEUE_Y);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return ExchangeBuilder.headersExchange(HEADER_EXCHANGE).build();
    }

    @Bean
    public Binding bindingX() {
        Map<String, Object> map = new HashMap<>();
        map.put("key1", "value1");
        map.put("name", "yzm");
        // whereAll:表示完全匹配
        return BindingBuilder.bind(queueX()).to(headersExchange()).whereAll(map).match();
    }

    @Bean
    public Binding bindingY() {
        Map<String, Object> map = new HashMap<>();
        map.put("key2", "value2");
        map.put("name", "yzm");
        // whereAny:表示只要有一对键值对能匹配就可以
        return BindingBuilder.bind(queueY()).to(headersExchange()).whereAny(map).match();
    }
}

消息发送和接受

@RestController
public class HeadersController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/headers")
    public void headers() {
        String s = "Hello World";
        System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("key1", "value1");
        messageProperties.setHeader("name", "yzm");
        Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend("headers_exchange", "", message);
    }

    @GetMapping("/headers2")
    public void headers2() {
        String s = "Hello World";
        System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("key3", "value3");
        messageProperties.setHeader("name", "yzm");
        Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend("headers_exchange", "", message);
    }

    @RabbitListener(queues = RabbitMQConfiguration.QUEUE_X)
    public void receiveX(String message) {
        System.out.println(" [ 消费者@X号 ] Received ==> '" + message + "'");
    }

    @RabbitListener(queues = RabbitMQConfiguration.QUEUE_Y)
    public void receiveY(String message) {
        System.out.println(" [ 消费者@Y号 ] Received ==> '" + message + "'");
    }
}

测试结果

发送请求:http://localhost:8080/headers

生产消息时,头部键值对有:“key1”=“value1"和"name”=“yzm”,跟X队列能完全匹配上,跟Y队列能匹配上其中一个,所以两个消费者都能消费到消息

发送请求:http://localhost:8080/headers2

只有Y队列能匹配上一个键值对

相关文章