rabbitMQ 生产者与消费者

x33g5p2x  于2021-08-23 转载在 Java  
字(4.8k)|赞(0)|评价(0)|浏览(471)

客户端开发

连接rabbitMQ

连接客户端之前需要建立 vhost, 有三种方式,如果是spring集成则采用第一种方式;由于是原生开发,采用第二种方式,具体操作步骤看下图

  • 处理方式一(推荐)
    在 application.properties 文件中添加 spring.rabbitmq.virtual-host=test
  • 处理方式二(推荐)
    在 rabbitmq 管理后天中添加test 目录
  • 处理方式三
    在启动docker时指定vhost : RABBITMQ_DEFAULT_VHOST=test

引入客户端依赖

	 <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.1.0</version>
        </dependency>
    </dependencies>

获取连接方式,如果启动控制台没有报错,说明连接成功;

public static Connection getConnection() throws IOException {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //定义连接地址
        factory.setHost("ip");
        //定义端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("test");
        factory.setUsername("zszxz");
        factory.setPassword("密码");
        // 通过工厂获取连接
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }

创建队列和交换器

  • 声明一个交换机 test-ex , 类型为 direct, 持久化,并且非自动删除;exchangeDeclare方法 有多个重载方法;第一个参数exchange 交换机名称,第二个参数type 交换机类型,第三个参数durable 是否持久化;第四个参数autodelete 是否自动删除;第五个是否内置internal;第六个参数argument结构化参数;
  • 声明一个队列 test-ch, 持久化,非排他,非自动删除;第一个参数exchange 队列名称,第二个参数durable 是否持久化;第三个参数是否排他;第四个参数autodelete 是否自动删除;第五个参数argument队列的其它参数;
  • queueBind 方法参数 第一个参数queue 队列名称, 第二个参数 exchage 交换机名称,第三个参数 routingKey 用来绑定交换机和队列的路由键;第四个参数argument 其它参数;
    public static void main(String[] args) {
        try {
            Connection connection = getConnection();
            //获得信道
            Channel channel = connection.createChannel();
            String queueName = "test-ch";
            String exchangeName = "test-ex";
            String routingKey = "test-router";
            //声明交换器
            channel.exchangeDeclare(exchangeName, "direct", true);
            // 声明队列
            channel.queueDeclare(queueName, true, false, false, null);
            // 绑定队列
            channel.queueBind(queueName,exchangeName,routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

注:rabbimq 中 交换器不会消耗性能,而队列会消耗性能;

创建成功后管理面板显示交换机

点击交换机text -ex 可以看到绑定的队列

发送消息

发送消息使用到 channel.basicPublis 方法;

  • 第一个参数exchange为交换机名称,
  • 第二参数routingKey为绑定的路由键;
  • 第三个参数props为消息属性;
  • 第四个参数body为消息内容;
  • 其中消息属性指定了消息内容格式为text/plain , 优先级为1 ,传递模式2为持久化模式;
public static void main(String[] args) {
        try {
            Connection connection = getConnection();
            //获得信道
            Channel channel = connection.createChannel();
            String queueName = "test-ch";
            String exchangeName = "test-ex";
            String routingKey = "test-router";
            //声明交换器
            channel.exchangeDeclare(exchangeName, "direct", true);
            // 声明队列
            channel.queueDeclare(queueName, true, false, false, null);
            // 绑定队列
            channel.queueBind(queueName,exchangeName,routingKey);
            //发布消息
            byte[] messageBodyBytes = "Hello Word !!!".getBytes();
            // 设置 消息属性
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.contentType("text/plain")
                    .priority(1)
                    .deliveryMode(2);

            channel.basicPublish(exchangeName,routingKey,builder.build(),messageBodyBytes);
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

管理面板点击队列 test-ch ,然后获取消息,可以看见刚刚发送的消息内容;

消费消息

消费消息分为推模式和拉取模式;推模式类似订阅,能够自动接收队列里面的消息;如果是拉模式则需要手动接收;在消费模式中有个消息应答确认机制ack,如果消费者接收到消息后提交应答,那么 rabbitMq 会将内存或者磁盘中的消息进行删除;所以为了确保做到真正的消费消息,一般情况下我们都会设置自动应答autoAck 为false ,然后通过方法进行手动提交;

推模式

推模式消费消息使用到basicConsume 方法;消费消息注意点为接收到消息后才进行应答,所以 autoAck 需要设置为false,不自动应答;

basicConsume 的参数如下

  • 第一个参数queue 队列名称
  • 第二个参数 autoAck 设置是否应答;
  • 第三个参数 consumerTag 消费者标签;
  • 第四个参数回调 callback 这边使用 DefaultConsumer 实现;
 public static void main(String[] args) {
        try {
            Connection connection = getConnection();
            boolean autoAck = false;
            String queueName = "test-ch";
            String tagName = "consume-tag";
            //获得信道
            Channel channel = connection.createChannel();
            channel.basicConsume(queueName,autoAck,tagName,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    System.out.println("键:"+routingKey+"-----"+new String(body));
                    long deliveryTag = envelope.getDeliveryTag();
                    // 应答
                    channel.basicAck(deliveryTag,false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

输出

键:test-router-----Hello Word !!!

再次去后台获取消息时,队列中的消息为空,因子消息被消费了;

拉模式

我们重新使用生产者发送一条消息;然后进行消费;

    public static void main(String[] args) {
        try {
            Connection connection = getConnection();
            //获得信道
            Channel channel = connection.createChannel();
            String queueName = "test-ch";
            // 获取响应
            GetResponse response = channel.basicGet(queueName, false);
            System.out.println(new String(response.getBody()));
            //
            channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

输出结果

Hello Word !!!

如果查看队列中有多少消息呢,可以通过后台进行查看,ready 表示 等待被消费的消息数量,特色D表示持久化durable队列

拒绝消息

消费者可以接收消息,也可以拒绝消息;

单条消息拒绝 channel.basicReject(long delieveryTag, boolean requere); 方法进行拒绝应答;

如果是多条消息channel.basicNack(long delieveryTag,boolean multiple, boolean requere); 参数 multiple 为false 功能与 channel.basicReject 方法功能一致,为true 表示拒绝 编号 delieveryTag 之前的所有消息;

关闭连接

关闭连接是为了释放系统资源,关闭连接需要连接关闭和通道关闭,

channel.close();
connection.close();

相关文章

微信公众号

最新文章

更多