direct
常量: 常量配置 交换机名称, 队列名称,路由键名称
/**
* @author lsc
* <p> </p>
*/
public class MqConstant {
// 队列名称
public static final String QUEUE_NAME = "zs-q";
// 交换机名称
public static final String EXCHANGE_NAME = "zs-ex";
// 路由键名称
public static final String ROUTING_NAME = "zs-r";
}
配置文件
设置 账号 密码 和 ip ;
server:
port: 8096
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 'ip'
port: 5672
username: '账号'
password: '密码'
#虚拟host 可以不设置,使用server默认host /
virtual-host: test
配置类
/**
* @author lsc
* <p> </p>
*/
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue() {
return new Queue(MqConstant.QUEUE_NAME,true,false,false);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(MqConstant.EXCHANGE_NAME,true,false);
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(MqConstant.ROUTING_NAME);
}
}
测试类
/**
* @author lsc
* <p> </p>
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
String message = "zs hello word";
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(MqConstant.EXCHANGE_NAME,MqConstant.ROUTING_NAME,message,correlationId);
}
}
发送成功
topic
topic 只需要修改配置类,绑定多个队列即可;
@Configuration
public class TopicConfig {
@Bean
public Queue firstQueue() {
return new Queue("topic-queue-1");
}
@Bean
public Queue secondQueue() {
return new Queue("topic-queue-2");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topic-ex");
}
@Bean
Binding bindingExchangeMessage1() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic-routing-1");
}
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic-routing-2");
}
}
在发送消息地时候修改对应地交换机名称,路由键即可发送至不同队列;
// 发送至队列 topic-queue-1
rabbitTemplate.convertAndSend("topic-ex","topic-routing-1",message,correlationId);
// 发送至队列 topic-queue-2
rabbitTemplate.convertAndSend("topic-ex","topic-routing-2",message,correlationId);
自动应答
在之前生产者地配置基础上,使用注解@RabbitListener 可以实现对队列消息地监听,此监听对应地消息应答机制为自动应答;
/**
* @author lsc
* <p>消费者--消息监听 </p>
*/
@Component
@RabbitListener(queues = MqConstant.QUEUE_NAME)//监听的队列名称
public class QueueListen {
@RabbitHandler
public void process(String message, Channel channel) {
System.out.println("消费者收到消息-----------" + message);
}
}
手动应答
手动应答需要修改配置文件,添加listener 配置
server:
port: 8096
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host:
port: 5672
username:
password:
# 是否返回回调
publisher-returns: true
template:
#开启mandatory: true, basic.return方法将消息返还给生产者
mandatory: true
#虚拟host 可以不设置,使用server默认host /
virtual-host: test
listener:
simple:
# 手动应答
acknowledge-mode: manual
# 最少消费者数量
concurrency: 1
# 最多消费者数量
max-concurrency: 10
# 支持重试
retry:
enabled: true
配置手动应答后如果消费者没有应答接收到消息,重新启动工程后还会接收到之前地消息;
/**
* @author lsc
* <p> </p>
*/
@Component
public class AckQueueListen {
@RabbitListener(queues = MqConstant.QUEUE_NAME)
public void process1(Message message, Channel channel) {
System.out.println("消费者接收消息: " + new String(message.getBody()));
try {
// 手动应答成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
如果是手动拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
配套源码 微信公众号: 知识追寻者 回复 rabbitmq