RabbitMQ03_work模型的消息生产和消费

x33g5p2x  于2021-12-19 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(251)

RabbitMQ03_work模型的消息生产和消费

多个消费者共同消费队列中的信息:

  • 消息生产者:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work",true,false,false,null);
    for(int i=1;i<=20;i++) {
        channel.basicPublish("", "work", null, (i+"hello work queue").getBytes());
    }
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
  • 消息消费者1:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为true
    channel.basicConsume("work", true,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者1--"+new String(body));
        }
    });
}
  • 消息消费者2:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为true
    channel.basicConsume("work", true,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者2--"+new String(body));
        }
    });
}
消费者1--2hello work queue
消费者1--4hello work queue
消费者1--6hello work queue
消费者1--8hello work queue
消费者1--10hello work queue
消费者1--12hello work queue
消费者1--14hello work queue
消费者1--16hello work queue
消费者1--18hello work queue
消费者1--20hello work queue
消费者2--1hello work queue
消费者2--3hello work queue
消费者2--5hello work queue
消费者2--7hello work queue
消费者2--9hello work queue
消费者2--11hello work queue
消费者2--13hello work queue
消费者2--15hello work queue
消费者2--17hello work queue
消费者2--19hello work queue

可以观察发现,它默认是一种轮询的平均分配的效果

  • 修改消费者1,使其消费速度变慢:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为true
    channel.basicConsume("work", true,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者1--"+new String(body));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

再次测试,发现消息仍然是平均分配的,但消费者2很快完成,消费者1却花了20s才完成

  • 修改消费者1,使其出错无法执行消费:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为true
    channel.basicConsume("work", true,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            int i = 1/0;
            System.out.println("消费者1--"+new String(body));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

结果发现,消费者2消费了10条消息,但消费者1出错,1条消息都没有消费
从管理界面中发现,待处理消息为0,未确认消息为0,说明丢失了10条消息!!

  • 修改消费者1和消费者2,关闭自动确认机制,选择手动确认,并设定每次只允许消费一条消息:

消费者1:

public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //设定每次只能从队列中取1个消息来消费
    channel.basicQos(1);
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为false
    channel.basicConsume("work", false,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            int i = 1/0;
            System.out.println("消费者1--"+new String(body));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //手动确认消息,参数1为消息的标识,参数2表示每次只允许确认1个消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });
}

消费者2:

public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //设定每次只能从队列中取1个消息来消费
    channel.basicQos(1);
    channel.queueDeclare("work",true,false,false,null);
    //自动确认设为false
    channel.basicConsume("work", false,new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者2--"+new String(body));
            //手动确认消息,参数1为消息的标识,参数2表示每次只允许确认1个消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });
}

结果发现,消费者1出错无法执行,消费者2消费了所有消息,实现了能者多劳,且无消息丢失:

消费者2--2hello work queue
消费者2--1hello work queue
消费者2--3hello work queue
消费者2--4hello work queue
消费者2--5hello work queue
消费者2--6hello work queue
消费者2--7hello work queue
消费者2--8hello work queue
消费者2--9hello work queue
消费者2--10hello work queue
消费者2--11hello work queue
消费者2--12hello work queue
消费者2--13hello work queue
消费者2--14hello work queue
消费者2--15hello work queue
消费者2--16hello work queue
消费者2--17hello work queue
消费者2--18hello work queue
消费者2--19hello work queue
消费者2--20hello work queue

相关文章