Spring Cloud Alibaba - 消息队列(四)(RocketMQ源码解读 / NameServer | Broker | Producer | Consumer)(目前存在的不足之处)

x33g5p2x  于2021-09-18 转载在 Spring  
字(4.4k)|赞(0)|评价(0)|浏览(508)

前言

距离上次发布 RocketMQ 的博文已经有六个月的时间了,中间从有开始想写源码的念头,动手到现在断断续续的写完花了也有三个多月时间,具体为啥会写那么久,主要是因为,最终 RocketMQ 被我放弃,更改使用 RabbitMq了
在这里插入图片描述

为啥放弃 RocketMQ ? 我稍微吐槽一下好了 ╮(╯_╰)╭

最开始决定使用 RocketMQ 是基于以下几点考虑:

  • 这是阿里生态的一部分
  • 支撑过双十一的考验
  • 和 Kafka 与 RabbitMQ 相比,适用的场景更为全面,并且支持事务
  • 近期文章有许多于其相关的影子,证明了它的活跃度不断上升,做为技术人当然有必要站在技术的最前沿
  • 国产咱必须得支持

槽点一(文档)

为啥最终放弃了呢 ╮(╯▽╰)╭ 且不说它还有开源版和商业版之分,这边以开源版为例。

因为,项目都更新到 4.7.1 了(现在已经是4.8.0),然而官方的文档写的真的是…无力吐槽了…((/- -)/,就跟没人维护一样,撰写的版本还是 4.3.0的
在这里插入图片描述

我也是看着看着觉得不对劲,最终才发现这个问题!!!

槽点二(springboot版本)

还有一些 " 小 " 问题 …( _ _)ノ|壁

比如:

微服务都以 spring boot 为驱动,目的是为了简化配置快速开发,RocketMQ 自然也有 对应的 spring Boot 包,但是!里面对应依赖的springBoot的版本都偏低,所以,还需要自己排除相对应依赖。

<groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.1</version>

里面对应依赖springBoot版本是2.0.5,所以需要排除自己加上新的版本依赖。

<exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>

槽点三(对Spring-cloud-stream支持低)

再比如:

spring 社区对所有MQ进行了高度的抽象,推出了一个统一的消息驱动框架,目的是为了调用的时候不同太关注于技术,只需要关注于业务逻辑,那么MQ是使用哪一种就对业务没有任何影响,以下就是官方链接。

https://spring.io/projects/spring-cloud-stream

在这里插入图片描述

天啊,竟然有那么好的东西!何愁项目延期!!何愁大局不稳!!何愁江山不定!!!

在这里插入图片描述

刚想拿过来用,发现!!!!!

在这里插入图片描述

官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。其它是需要厂家自己去维护的,然而,这群阿里的大神,貌似目前看来,并不热衷于维护它,包括文档也是,都是十分陈旧的!!!!

目前 rocketMq stream 最新版本是 2.2.5

<groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.5.RELEASE</version>

但是它依赖的rocketmq-client版本还是4.4.0

在这里插入图片描述

在这里插入图片描述

槽点四(向IE看起???版本不稳定/源代码注释少的可怜)

不得不说,版本迭代还是很快的,但是你也不能改动那么大啊,新版本对比旧版本源代码变了许多,有点 看IE的感觉!!!

由于官方文档写的不好,版本又旧,社区又不算太活跃,所以只能自己去看源代码,但是!!!!天啊 ,注释呢!!!不写注释你不是耍流氓吗,瞧瞧spring源码虽然也没怎么看懂,但是人家注释还是有的!!!

在这里插入图片描述

由于以上总总原因,开发成本略高,项目又紧,迫不得已,改用 RabbitMQ进行开发。

在这里插入图片描述

源码下载

当初刚开始编写RocketMQ系列的时候,最新的代码是4.71,但是最近新版本是4.8.0,由于博主之前都以4.7.1作为研究,所以本篇博文以4.7.1进行讲解。

官方网址以及下载安装前面的博文已经进行了详细的描述,所以这边仅仅附上4.7.1的源码下载连接。

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip

正文

以下对于源码的理解是基于官方文档案例以一些参考资料,若有理解错的地方欢迎大家指出来。

NameServer

先附上一张大致讲解图(感觉不是很好看,可是我的美术造诣也就这水平了 ( ̄_, ̄ ) )

在这里插入图片描述

代码详解

在这里插入图片描述

NameServer 的启动是通过 NamesrvStartup 的 main 方法进行启动,而 main 方法主要做了两件事

  1. 创建NamesrvController
  2. 启动

那么接下来就进行跟踪分析。
首先看一下创建NamesrvController做了什么事?

在这里插入图片描述

在这里插入图片描述

其中缓存信息registerConfig具体操作如下:

在这里插入图片描述

Controller中的configuration是在new的时候,在构造器中生成,其中的allConfigs也是创建的时候生成。

在这里插入图片描述

在这里插入图片描述

创建 NamesrvController 就如上所示,那么接下来看start操作。

在这里插入图片描述

总共做了两件事,第一件初始化参数,第二件声明关闭事件,关闭的时候会关闭定时任务和线程池等,所以关闭的时候用它的 showdown 会比 kil l-9 优雅

在这里插入图片描述

接下来关注初始化事件。

在这里插入图片描述

初始化事件做的事情不多,其中移除不存活的Broker还有点意思,其它就没什么了(那个博主没看懂的除外)

在这里插入图片描述

至此,NameServer 的 启动编写完毕

Broker

继续附上一张大大大图

在这里插入图片描述

代码详解:

启动

在这里插入图片描述

Broker 的启动也是根据 BrokerStartup 的 main 方法进行,主要做了两件事:

  1. 创建 BrokerController
  2. 启动

先看一下 BrokerController 创建的时候做了什么事

在这里插入图片描述

这边和 NameServer 有一点最大的差别,它多了一种配置 NettyClientConfig ,之所以会多了一个配置,是因为在使用事务消息模式的时候,他事可以当做客户端的。

在这里插入图片描述

在这里插入图片描述

这边倒是没有什么好关注的,主要看看初始化时候做了什么事

在这里插入图片描述

看一下加载磁盘数据做了什么?

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

我们接着往下看

在这里插入图片描述

但是还是有一个定时器是可以研究一下的,就是那个可以写sql的过滤 tag

在这里插入图片描述

既然这边有个定时器,持久化filter,那么就代表 filter 是在Broker中进行的,所以它的效率才会那么高!!!

注册

注册是在创建完Controller后的启动中进行的

在这里插入图片描述

整个start就做了这一件核心事

在这里插入图片描述

在这里插入图片描述

先看找寻Broker的代码

在这里插入图片描述

当循环完所有的Broker,逐一进行判断后,在调用后面 doRegisterBrokerAll 进行注册(中间找的代码看不懂!!!)

在这里插入图片描述

到此,关于 Broker 启动,博主能看懂的都描述完毕。

Producer

生产者的代码主要包含两个部分,一个是启动,一个是发送消息

老规矩,先上图:

在这里插入图片描述

启动

先说说启动干了啥:

在这里插入图片描述

皮一下,很开心 <( ̄︶ ̄)>

在这里插入图片描述

在这里插入图片描述

这里是各种组件的初始化,底层运用到相当多的netty的相关知识,有兴趣的可以自行查看。

发送消息

在这里插入图片描述

在这里插入图片描述

核心的关键步骤就以上三点。

先看如何获取到消息队列:

在这里插入图片描述

接着再看如何发送消息:

在这里插入图片描述

从NameServer 找寻 Broker地址,是更新缓存再从缓存中获取

在这里插入图片描述

在发送Netty请求时,实际上是指定的MessageQueue,而不是Topic。Topic只是用来找 MessageQueue 。

Consumer

消费者也是两部分,一部分启动,然后启动时运行组件进行数据的消费,散乱东西有点多,图不好画,那我就简单画??ヾ(≧∇≦/*)ゝ

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

客户端是否顺序消费就在于此实例化。

在这里插入图片描述

启动的核心代码中,初始化了许多的组件,但是与我们使用关联性比较高的是负载均衡和具体的消息获取。

在这里插入图片描述

消息获取

先看一下消息获取的源码
在这里插入图片描述

在这里插入图片描述

这里面有两点可以了解的

第一点是选择消费实例,这里的实例实在之前 注册本地消费者的时候 ,写入与缓存

在这里插入图片描述

在这里插入图片描述

缓存已写入,所以启动的时候就可以获取得到:

在这里插入图片描述

第二点就是,虽然我们使用的是推的模式进行数据的获取和消费,但是最终还是用拉的模式进行处理。

接下来继续看拉取的流程

在这里插入图片描述

在这里插入图片描述

这边也有两点可以讲解

第一是 拉取消息的回调方法
第二是拉取消息的具体方式

但是由于拉取消息后才会进入回调,一些参数配也有影响,比如,defaultMQPushConsumer.getPullBatchSize(),就是拉取的数量,回调会用到,所以先看第二种。

在这里插入图片描述

在这里插入图片描述

以上就是拉取消息,那么最后再观察下 如何处理消费 pullCallback !

PullCallback 最终是调用 onSuccess方法,而里面我认为最核心的是

在这里插入图片描述

在这里插入图片描述

消息的处理有两种,一种是并发,一种是顺序:

并发

在这里插入图片描述

这里有一点需要注意ps(这里是半懵半猜的,有看懂的可以提醒一下博主)

defaultMQPushConsumer.getConsumeMessageBatchMaxSize()

是判断批量的数量大小,但是默认 是一条,网络上却有的资料是说 32,跟进去却没有发现哪里有进行数值的修改,只在上一步拉取的时候有设置32的拉取数量,姑且只能猜测是这个数值在其它封装的地方有进行默认数值的更改。

顺序:

这个还是有点意思的。毕竟多个 MessageQueue 获取消息怎么才能顺序

ConsumeRequest 中

在这里插入图片描述

然而他是拿到一个锁一个 ㄟ( ▔, ▔ )ㄏ。

负载均衡

在这里插入图片描述

在这里插入图片描述

至于负载均衡模式是哪里来的,在我们最开始创建实例的时候们就有一个默认模式的设置

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

END

怎么说呢,一把辛酸泪吧,哪个项目不是资源少时间紧,如果用一个组件还要花相当的精力去研究,除非甲方仁慈公司宽容,不然还是放弃吧。

真的要用 RocketMQ 的话,那就用商业版吧(我觉得这个开源版目的就是引导我们用商业版!!!)

相关文章