channel.basicPublish() 有多个重载方法,当有五个参数时,第三个参数 mandatory 为true 时,表示交换机无法根据类型和路由键找到队列时会将失败消息返回给RabbitMq,接收消息时需要使用channel.addReturnListener
方法; 当 mandatory 为false 时,出现发送失败的情况直接将消息丢弃;
代码示例
channel.basicPublish(exchangeName,routingKey,true,builder.build(),messageBodyBytes);
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String msg = new String(bytes);
System.out.println("返回消息:"+msg);
}
});
过期时间可以对具体的消息进行设置过期时间,这种方式的优点是可以对每个消息进行自定义设置过期时间;其次也可以对队列进行设置过期时间;
对消息设置过期时间
主要是加了 x-message-ttl
参数, 单位为毫秒;并且当前队列为非持久化队列;
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-undurable";
String exchangeName = "test-ex";
String routingKey = "test-router";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
HashMap<String, Object> args = new HashMap<>();
// 设置消息
args.put("x-message-ttl",8000);
// 声明队列
boolean durable = false;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
如果打开后台就能观测到队列 test-undurable
特色标志为TTL ,有信息ready值为1,然后经过8秒后ready为0;
对队列设置过期时间
设置参数 x-expires
单位为毫秒;如下示例代码片段演示的是3分钟;
String queueName = "test-expire";
String exchangeName = "test-ex";
String routingKey = "test-router";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
HashMap<String, Object> args = new HashMap<>();
// 设置队列过期时间
args.put("x-expires",1000*60*3);
// 声明队列
boolean durable = false;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
后台多了一个特色为Exp的队列 test-expire
死信队列全称为 死信交换器(dead-letter-exchange),当一个消息在一个队列中变成死信后,它就会被重新发送到另一个交换器,这个交换器就被成为死信交换器DLX, 绑定DLX的队列称为死信队列;DLX 与平常的交换器没什么不同,就多了一个接收死信队列消息的功能;dlx通常用于处理异常状态下消息不能被处理的问题;
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-dlx";
String exchangeName = "test-ex-dlx";
String routingKey = "test-router-exlx";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
HashMap<String, Object> args = new HashMap<>();
// 指定dlx
args.put("x-dead-letter-exchange",exchangeName);
// 声明队列
boolean durable = false;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
指定参数 x-dead-letter-exchange
发送消息后 后台会出现一个 test-dlx 的队列 ,特色类型为DLX队列;
其次我们使用TTL过期消息绑定到DLX交换器,这样如果消息时间过期后会将过期的消息发送到死信队列;
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-ttl";
String exchangeName = "test-ex-ttl";
String routingKey = "test-router-ttl";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
HashMap<String, Object> args = new HashMap<>();
// 指定dlx
args.put("x-dead-letter-exchange","test-ex-dlx");
// 指定dlx 路由键
args.put("x-dead-letter-routing-key","test-router-exlx");
// 设置消息过期时间
args.put("x-message-ttl",8000);
// 声明队列
boolean durable = false;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
刚刚启动ttl,dlx 2个服务,各自产生一个队列并且各自存储一个消息;
当ttl 消息过期后,会将过期的消息发送到死信队列;死信队列的的消息从1变为2;
延迟队列是生成者发送消息后经过一定的延时后才将消息发送到消费者手中;延迟队列的场景一般使用在订单系统,比如一个用户下单30分钟内订单支付失败,该订单就会通过异常处理,这时就可以使用延迟队列实现;rabbitMQ并未具体实现延迟队列,但我们可以使用上节的dlx队列和ttl队列结合实现;即首先发送ttl消息为30分钟,如果30分钟后消息未被处理则进入延迟队列;
优先队列是指在该队列中的消息被消费具有优先权,如果优先权的值越大,则被优先消费的机率也越大;
通过设置参数x-max-priority 设置队列为优先对俄,值越大,优先级越高,最大值为10;
对具体的消息也可以通过消息属性设置优先级, 示例代码中设置优先级为1,,范围为 0到5;
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-pri";
String exchangeName = "test-pri";
String routingKey = "test-router-pri";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
HashMap<String, Object> args = new HashMap<>();
// 设置消息
args.put("x-max-priority",10);
// 声明队列
boolean durable = false;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
//发布消息
byte[] messageBodyBytes = "Hello Word !!!".getBytes();
// 设置 消息属性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 设置消息的优先队列为 1
builder.contentType("text/plain")
.priority(1)
.deliveryMode(2);
channel.basicPublish(exchangeName,routingKey,builder.build(),messageBodyBytes);
rabbitmq 消息的持久化分为交换器的持久化,队列的持久化,以及消息的持久化;在持久化中需要使用三种一起使用才能保证消息真的持久化;
如果交换器,队列,消息只要其中一个未设置持久化,那么重启后就会造成对应的消息消失;还有消费者的应答机制也要设置为false,如果设置为true,消费者未来得及处理消息就应答了也会造成消息消失;
交换器持久化
boolean durable = true;
channel.exchangeDeclare(exchangeName, "direct", durable);
队列持久化
// 声明队列
boolean durable = true;
boolean exclusive = false;
boolean autodelete = false;
channel.queueDeclare(queueName, durable, exclusive, autodelete, args);
消息持久化
int durable = 2;
// 设置消息的优先队列为 1
builder.contentType("text/plain")
.priority(1)
.deliveryMode(durable);
仅仅保证消息的持久化并不能完全保证消息不会丢失,生产者在发送消息的时候也有可能造成消息丢失;生产者发送消息时,并不知道自己的消息是否真的发送到了rabbimq 的服务端;所以rabbitmq 增加了一个 生产者的消息应答机制,确保消息真的发送到了了rabbimq服务端; 其工作大体流程为,生产者发送消息之前需要开启信道确认模式,开启后生产者每发送一条消息到服务端,都会生成一个唯一的ID用于区分不同的消息,服务端接收到消息后会发送一个确认信息给生产者,如果接收失败,则生产者需要手动进行重新发送;
// 设置信道为确认应答模式
channel.confirmSelect();
// 发送消息
channel.basicPublish(exchangeName,routingKey,builder.build(),messageBodyBytes);
try {
if (!channel.waitForConfirms()){
System.out.println("发送消息失败");
}else{
System.out.println("发送消息成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
生产者还有一个事物机制,也可以实现应答机制类似效果,但事物的使用是阻塞形式,会造成很大的性能消耗;具体工作流程是发送消息之前开启事物,消息发送之后进行异常捕获,如果发生异常,说明消息发送失败,进行事物回滚,重新发送消息;
try {
// 设置信道为事物模式
channel.txSelect();
// 发送消息
channel.basicPublish(exchangeName,routingKey,builder.build(),messageBodyBytes);
// 提交事物
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
// 回滚事物
channel.txRollback();
}