连接客户端之前需要建立 vhost, 有三种方式,如果是spring集成则采用第一种方式;由于是原生开发,采用第二种方式,具体操作步骤看下图
引入客户端依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
</dependencies>
获取连接方式,如果启动控制台没有报错,说明连接成功;
public static Connection getConnection() throws IOException {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//定义连接地址
factory.setHost("ip");
//定义端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("test");
factory.setUsername("zszxz");
factory.setPassword("密码");
// 通过工厂获取连接
Connection connection = null;
try {
connection = factory.newConnection();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
public static void main(String[] args) {
try {
Connection connection = getConnection();
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-ch";
String exchangeName = "test-ex";
String routingKey = "test-router";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
} catch (IOException e) {
e.printStackTrace();
}
}
注:rabbimq 中 交换器不会消耗性能,而队列会消耗性能;
创建成功后管理面板显示交换机
点击交换机text -ex 可以看到绑定的队列
发送消息使用到 channel.basicPublis 方法;
public static void main(String[] args) {
try {
Connection connection = getConnection();
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-ch";
String exchangeName = "test-ex";
String routingKey = "test-router";
//声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列
channel.queueBind(queueName,exchangeName,routingKey);
//发布消息
byte[] messageBodyBytes = "Hello Word !!!".getBytes();
// 设置 消息属性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain")
.priority(1)
.deliveryMode(2);
channel.basicPublish(exchangeName,routingKey,builder.build(),messageBodyBytes);
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}catch (TimeoutException e) {
e.printStackTrace();
}
}
管理面板点击队列 test-ch ,然后获取消息,可以看见刚刚发送的消息内容;
消费消息分为推模式和拉取模式;推模式类似订阅,能够自动接收队列里面的消息;如果是拉模式则需要手动接收;在消费模式中有个消息应答确认机制ack,如果消费者接收到消息后提交应答,那么 rabbitMq 会将内存或者磁盘中的消息进行删除;所以为了确保做到真正的消费消息,一般情况下我们都会设置自动应答autoAck 为false ,然后通过方法进行手动提交;
推模式
推模式消费消息使用到basicConsume 方法;消费消息注意点为接收到消息后才进行应答,所以 autoAck 需要设置为false,不自动应答;
basicConsume 的参数如下
public static void main(String[] args) {
try {
Connection connection = getConnection();
boolean autoAck = false;
String queueName = "test-ch";
String tagName = "consume-tag";
//获得信道
Channel channel = connection.createChannel();
channel.basicConsume(queueName,autoAck,tagName,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
System.out.println("键:"+routingKey+"-----"+new String(body));
long deliveryTag = envelope.getDeliveryTag();
// 应答
channel.basicAck(deliveryTag,false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
输出
键:test-router-----Hello Word !!!
再次去后台获取消息时,队列中的消息为空,因子消息被消费了;
拉模式
我们重新使用生产者发送一条消息;然后进行消费;
public static void main(String[] args) {
try {
Connection connection = getConnection();
//获得信道
Channel channel = connection.createChannel();
String queueName = "test-ch";
// 获取响应
GetResponse response = channel.basicGet(queueName, false);
System.out.println(new String(response.getBody()));
//
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
输出结果
Hello Word !!!
如果查看队列中有多少消息呢,可以通过后台进行查看,ready 表示 等待被消费的消息数量,特色D表示持久化durable队列
消费者可以接收消息,也可以拒绝消息;
单条消息拒绝 channel.basicReject(long delieveryTag, boolean requere); 方法进行拒绝应答;
如果是多条消息channel.basicNack(long delieveryTag,boolean multiple, boolean requere); 参数 multiple 为false 功能与 channel.basicReject 方法功能一致,为true 表示拒绝 编号 delieveryTag 之前的所有消息;
关闭连接是为了释放系统资源,关闭连接需要连接关闭和通道关闭,
channel.close();
connection.close();