【RocketMq实战第六篇】-Offset

x33g5p2x  于2021-12-19 转载在 其他  
字(1.7k)|赞(0)|评价(0)|浏览(293)

前言

实际运行 中的系统,难免 会遇到重新消费某条消息、 跳过 一段 时间内的消息等情况 。 这些异常情况的处理,都和 Offset 有关。

正文

首先来明确一下 Offset 的含义, RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理 。

Offset主要分为本地文件类型和 Broker代存 的类型两种 。

Rocketmq集群有两种消费模式

  • 默认是 CLUSTERING 模 式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到 的消息内容不一样 。 这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构 。
  • BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地 。

使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的 事,使用PullConsumer,我们就要自己处理 OffsetStore。

DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费 消 息:比如 setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_ FIRST_OFFSET),这个语句设置从最小的 Offset开始读取。

如果从队列开始到 感兴趣的消息之间有很大的范围,用 CONSUME_FROM_FIRST_OFFSET参数 就不合适了,可以设置从某个时间开始消 费消息 , 比如 Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP), Consumer. setConsumeTimestamp("20131223171201”), 时间戳格式是精确到秒的 。

/**
 * Consumer从哪里开始消费<br>
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 */
public enum ConsumeFromWhere {
    /**
     * 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    /**
     * 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_FIRST_OFFSET,
    /**
     * 一个新的订阅组第一次启动从指定时间点开始消费<br>
     * 后续再启动接着上次消费的进度开始消费<br>
     * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
     */
    CONSUME_FROM_TIMESTAMP,
}

注意设置读取位置不是每次都有效,它的优先级默认在 Offset Store后面 , 比如 在 DefaultMQPushConsumer 的 BROADCASTING 方式 下 ,默 认 是 从 Broker 里读取某个 Topic 对 应 ConsumerGroup 的 Offset, 当读 取不到 Offset 的时候, ConsumeFromWhere 的设置才生效 。 大部分情况下这个设置在 Consumer Group初次启动时有效。 如果 Consumer正常运行后被停止, 然后再启动, 会 接着上次的 Offset开始消费, ConsumeFromWhere 的设置元效。

相关文章