springboot集成Rabbitmq

x33g5p2x  于2021-08-23 转载在 Java  
字(3.3k)|赞(0)|评价(0)|浏览(382)

生产者

direct

常量: 常量配置 交换机名称, 队列名称,路由键名称

/**
 * @author lsc
 * <p> </p>
 */
public class MqConstant {

    // 队列名称
    public static final String QUEUE_NAME = "zs-q";
    // 交换机名称
    public static final String EXCHANGE_NAME = "zs-ex";
    // 路由键名称
    public static final String ROUTING_NAME = "zs-r";
}

配置文件

设置 账号 密码 和 ip ;

server:
  port: 8096
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 'ip'
    port: 5672
    username: '账号'
    password: '密码'
    #虚拟host 可以不设置,使用server默认host  /
    virtual-host: test

配置类

/**
 * @author lsc
 * <p> </p>
 */
@Configuration
public class DirectConfig {
    @Bean
    public Queue directQueue() {
        return new Queue(MqConstant.QUEUE_NAME,true,false,false);
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(MqConstant.EXCHANGE_NAME,true,false);
    }

    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(MqConstant.ROUTING_NAME);
    }

}

测试类

/**
 * @author lsc
 * <p> </p>
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        String message = "zs hello word";
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(MqConstant.EXCHANGE_NAME,MqConstant.ROUTING_NAME,message,correlationId);
    }
}

发送成功

topic

topic 只需要修改配置类,绑定多个队列即可;

@Configuration
public class TopicConfig {

    @Bean
    public Queue firstQueue() {
        return new Queue("topic-queue-1");
    }

    @Bean
    public Queue secondQueue() {
        return new Queue("topic-queue-2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topic-ex");
    }
    
    @Bean
    Binding bindingExchangeMessage1() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic-routing-1");
    }

    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic-routing-2");
    }
}

在发送消息地时候修改对应地交换机名称,路由键即可发送至不同队列;

// 发送至队列 topic-queue-1
rabbitTemplate.convertAndSend("topic-ex","topic-routing-1",message,correlationId);
// 发送至队列 topic-queue-2
rabbitTemplate.convertAndSend("topic-ex","topic-routing-2",message,correlationId);

消费者

自动应答

在之前生产者地配置基础上,使用注解@RabbitListener 可以实现对队列消息地监听,此监听对应地消息应答机制为自动应答;

/**
 * @author lsc
 * <p>消费者--消息监听 </p>
 */
@Component
@RabbitListener(queues = MqConstant.QUEUE_NAME)//监听的队列名称
public class QueueListen {


    @RabbitHandler
    public void process(String message, Channel channel) {
        System.out.println("消费者收到消息-----------" + message);
    }
}

手动应答

手动应答需要修改配置文件,添加listener 配置

server:
  port: 8096
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 
    port: 5672
    username: 
    password: 
    # 是否返回回调
    publisher-returns: true
    template:
    #开启mandatory: true, basic.return方法将消息返还给生产者
      mandatory: true
    #虚拟host 可以不设置,使用server默认host  /  
    virtual-host: test
    listener:
      simple:
        # 手动应答
        acknowledge-mode: manual
        # 最少消费者数量
        concurrency: 1
        # 最多消费者数量
        max-concurrency: 10
        # 支持重试
        retry:
          enabled: true

配置手动应答后如果消费者没有应答接收到消息,重新启动工程后还会接收到之前地消息;

/**
 * @author lsc
 * <p> </p>
 */
@Component
public class AckQueueListen {

    @RabbitListener(queues = MqConstant.QUEUE_NAME)
    public void process1(Message message, Channel channel) {
        System.out.println("消费者接收消息: " + new String(message.getBody()));
        try {
            // 手动应答成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

如果是手动拒绝

 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

配套源码 微信公众号: 知识追寻者 回复 rabbitmq

相关文章

微信公众号

最新文章

更多