【RocketMq实战第四篇】-不同类型消费者(DefaultMQPushConsumer&DefaultMQPullConsumer)

x33g5p2x  于2021-12-19 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(206)

前言

生产者和消费者是消息队列的两个重要角色,生产者向消息队列写人数据, 消费者从消息队列里读取数据。本篇讲解两种类型的消费者,一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;另 一个是 DefaultMQPullConsumer,读取操作中的大部分功 能由使用者自主控制 。

正文

DefaultMQPushConsumer 

使用 DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数 。 系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,而且加入新的 DefaultMQPushConsumer后会自动做负载均衡。

我们在安装目录下找到example项目的  package org.apache.rocketmq.example.quickstart;目录下源码介绍。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
      
        DefaultMQPushConsumer consumer = new  DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");    
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

讲解源码

DefaultMQPushConsumer需要设置三 个 参数 : 一 是这个 Consumer 的 GroupName,二是 NameServer 的地址和端 口号,三是 Topic 的名称 ,下面将分 别进行详细介绍 。

(1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力, GroupName需要和消息模式 (MessageModel)配合使用。

    RocketMQ支持两种消息模式: ClusteringBroadcasting

  • 在 Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息的一部分 内容, 同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
  • 在 Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消息会被多次分发,被 多个 Consumer消费。

(2)NameServer 的地址和端口 号,可以填写多个 ,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3 :port” 。

(3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”), 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。

处理流程

网上有很多文章讲处理流程的,个人觉得这个比较详细,https://blog.csdn.net/panxj856856/article/details/80776032把defaultMQPushConsumerImpl.start()方法按顺序都分析了。

不过我要说的重点是消息处理逻辑是在pullMessage这个函数的PullCallBack中。PullCallBack函数里有个 switch 语句,根据从 Broker 返回的消息类型做相应的 处理

switch (pullResult.getPullStatus ()) {
    case FOUND:
       ……
      break;
    case NO_NEW_MSG
       ……
       break;
     case OFFSET_ILLEGAL :
       ……
       break;
     default:
       break;
    }

pullMessage函数的参数是 final PullRequest pullRequest ,这是通过“长轮询”方式达到 Push效果的方法,长轮询方式既有 Pull 的优点,又兼具 Push方式的实时性。

Push的方式是 Server端接收到消息后,主动把消息推送给 Client端,主动权在Server端,实时性高。用 Push方式主动推送有很多弊 端:首先是加大 Server 端的 工作量,进而影响 Server 的性能;其次,Client 的处理能力各不相同, Client 的状态不受 Server 控制,

Pull方式是 Client端循环地从 Server端拉取消息,主动权在 Client手里, 自己拉取到一定量消息后,处理妥当了再接着取。Pull 方式的问题是循环拉取 消息的间隔不好设定,间隔太短就处在一个 “忙等”的状态,浪费资源;  Pull 的时间间隔太长 Server 端有消息到来时 有可能没有被及时处理。

下面就来介绍**“长轮询“**

“长轮询”方式通过 Client端和 Server端的配合,达到既拥有 Pull 的优 点,又能达到保证实时性的目的 。

大家只需记住长轮询就是在Broker在没有新消息的时候才阻塞,阻塞时间默认设置是 15秒,有消息会立刻返回

有兴趣的可以看一下长轮询源码在package org.apache.rocketmq.broker.longpolling.PullRequestHoldService.java的 run方法,里面是有三次Check,每次5s。

“长轮询”的 主动权还是 掌 握在 Consumer 手中, Broker 即使有大 量 消息积 压 ,也不会主动推 送给 Consumer 。

DefaultMQPushConsumer 的流量控制

上面我们分析得知PushConsumer的核心还是 Pull 方式。PushConsumer有个线程池 , 消息处理逻辑在各个线程里同时执行。

多线程处理业务是很麻烦的,所以RocketMQ定义了一个快照类 ProcessQueue来解决 堆积的数量 ?如何重复处理某些消息? 如何延迟处理某些消息? 等问题。每个 Message Queue 都会有个对应的 ProcessQueue 对象,保存了这个 Message Queue 消息处理状态的快照 。

ProcessQueue对象里主要的内容是一个 TreeMap 和一个读写锁。 TreeMap 里以 Message Queue 的 Offset作为 Key,以消息内容的引用为 Value,保存了 所有从 MessageQueue 获取到,但是还未被处理的消息; 读写 锁控制着多个线程对 TreeMap 对象的并发访 问 。

有 了 ProcessQueue 对象,流量控 制 就方便和灵活多了 ,客 户 端在每次 Pull请求前会做几个判断,分别取但还未处理的消息个数、消 息总大小、 Offset 的跨度,任何一个值超过设定的大小就隔一段时间再拉取消 息,从而达到流量控制的目的 。 此外 ProcessQueue 还可以辅助实现顺序消费的 逻辑。

有兴趣的可以翻看源码位置在org.apache.rocketmq.client.impl.consumer.pullMessage()

 

DefaultMQPullConsumer

我们来看一下包中示例代码,位置在package org.apache.rocketmq.example.simple.PullConsumer;

public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

讲解源码

 处理 逻辑是逐个 读 取某 Topic 下所有 Message Queue 的内容, 读完一遍后退出, 主要处理额外的三件事情:

( 1 )获取 Message Queue 并遍历

一 个 Topic 包括多个 Message Queue,如果这个 Consumer 需要获取 Topic 下所有的消息,就 要遍历多有的 Message Queue。 如果有特殊情况,也可以选 择某些特定的 Message Queue 来读取消息 。

( 2 )维护 Offsetstore

从一个 Message Queue 里拉取消息的时候,要传人 Offset参数( long类型 的值),随着不断读取消息 , Offset会不断增长 。 这个时候由用户负责把 Offset 存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等 。

( 3 )根据不同的消息状态做不同的处理

拉取消息的请求发出后,会返回: FOUND、 NO_MATCHED_MSG、 NO_NEW_MSG、 OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理 。比较重要的两个状态是 FOUNT 和 NO NEW MSG ,分别表示获取到消息和没 有新的消息 。

 

Consumer 的启动、关闭流程

对于 PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程 。 需要注意的是 Offset 的保存,要在程序的异常处理部分增加把 Offset 写人磁盘方 面的处理,记准了每个 Message Queue 的 Offset,才能保证消息消 费 的准确性 。

DefaultMQPushConsumer 的退出, 要调用 shutdown() 函数, 以便 释放资 源、保存 Offset 等 。 这个调用要加到 Consumer 所在应用的退出逻辑中 。

启动 DefaultMQPushConsumer 时, NameServer 地址填错,程序仍然 可以正常启动,但是不会收到消息 。

解决启动时NameServer填写错误报错:

可以在 Consumer.start()语句后调用: Consumer.fetchSubscribeMessageQueues(”TopicName”),这 时如果配 置信息写得不准确,或者当 前服务不可 用,这个语句会报 MQClientException 异 常 。

相关文章