中间件(九):RocketMQ代码解读3

x33g5p2x  于2021-10-27 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(309)

1、NameServer是如何处理Broker的注册请求的?

到NamesrvController这个类的初始化的方法里去,也就是NamesrvController.initialize()这个方法

到NettyServer是用于接收网络请求 的,那么接收到的网络请求给谁处理呢?

        其实就是给DefaultRequestProcessor这个请求处理组件来进行处理的。

 

我们如果要知道Broker注册请求是如何处理的,直接就是看DefaultRequestProcessor中的代码就可以了.

        接着我们进入这个类里的registerBroker()方法,去看看到底如何完成Broker注册的。

下面我们先在图里给大家体现一下RouteInfoManager这个路由数据管理组件,实际Broker注册就是通过他来做的:

到RouteInfoManager的注册Broker的方法里去看看,最终如何把一个Broker机器的数据放入RouteInfoManager中维护的路由数据表里去的。

        无非就是用一些Map类的数据结构,去存放你的Broker的路由数据就可以了,包括了Broker的clusterName、brokerId、brokerName这些核心数据。 而且在更新的时候,一定会基于Java并发包下的ReadWriteLock进行读写锁加锁,因为在这里更新那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程来更新他们才行!

2、Broker是如何发送定时心跳的,以及如何进行故障感知?

   NameServer核心是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor请求处理组件,来处理Broker注册请求。而真正的Broker注册的逻辑是放在RouteInfoManager这个路由数据管理组件里来进行实现的,最终Broker路由数据都会存放在RouteInfoManager内部的一些Map数据结构组成的路由数据表中。

 

        Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直都存活着,然后如果Broker一段时间没有发送心跳到NameServer,那么NameServer是如何感知到Broker已经挂掉了。
        Broker中的发送注册请求给NameServer的一个源码入口,其实就是在BrokerController.start()方法中,在BrokerController启动的时候,他其实并不是仅仅发送一次注册请求,而是启动了一个定时任务,会每隔一段时间就发送一次注册请求。

        启动了一个定时调度的任务,他默认是每隔30s就会执行一次Broker注册的过程,上面的

registerNameServerPeriod是一个配置,他默认的值就是30s一次。

        默认情况下,第一次发送注册请求就是在进行注册,他会把Broker路由数据放入到NameServer的RouteInfoManager的路由数据表里去。但是后续每隔30s他都会发送一次注册请求,这些后续定时发送的注册请求,其实本质上就是Broker发送心跳给NameServer了。

那么后续每隔30s,Broker就发送一次注册请求,作为心跳来发送给NameServer的时候,NameServer对后续重复发 送过来的注册请求(也就是心跳),是如何进行处理的呢?

        
        看一下RouteInfoManager的注册方法逻辑:

我们看下图,有一个红色圈圈示意了,每隔30s你发送注册请求作为心跳的时候RouteInfoManager里会进行心 跳时间刷新的处理:

接着我们来思考最后一个问题,那么假设Broker已经挂了,或者故障了,隔了很久都没有发送那个每隔30s一次的注册请求作为心跳,那么此时NameServer是如何感知到这个Broker已经挂掉的呢?

        我们重新回到NamesrvController的initialize()方法里去,里面有一个代码是启动了RouteInfoManager中的一个定时扫描不活跃Broker的线程。

上面这段代码,就是启动一个定时调度线程,每隔10s扫描一次目前不活跃的Broker,使用的是RouteInfoManager中的scanNotActiveBroke()方法,我们去看看那个方法的逻辑,就知道他如何感知到一个Broker挂掉了。

3、我们系统中使用的Producer是如何创建出来的?

3.1、NameServer启动之后的核心架构:

NameServer启动之后,就会有一个核心的NamesrvController组件,他就是用于控制NameServer的所有行为的,包括内部启动一个Netty服务器去监听一个9876端口号,然后接收处理Broker和客户端发送过来的请求。

3.2、Broker启动之后的核心架构

Broker启动之后,最核心的就是有一个BrokerController组件管控Broker的整体行为,包括初始化自己的Netty服务器用于接收客户端的网络请求,包括启动处理请求的线程池、执行定时任务的线程池,初始化核心功能组件,同时还会启动之后发送注册请求到NameServer去注册自己。
        Broker启动之后进行注册以及定时发送注册请求作为心跳的机制。

        NameServer有一个后台进程定时检查每个Broker的最近一次心跳时间,如果长时间没心跳就认为Broker已经故障。

在讲完这些内容过后,你可以认为在我们的RocketMQ集群里,已经启动好了NameServer,而且还启动了一批Broker,同时Broker都已经把自己注册到NameServer里去了,NameServer也会去检查这批Broker是否存活。
        其实此时我们不需要去关注NameServer和Broker干了别的什么事情,这个时候我们只要知道已经有了一个可用的RocketMQ集群就可以了,然后此时我们是不是就可以让自己开发好的系统去发送消息到MQ里去了?

        没错,所以此时我们就需要引入一个Producer组件了,实际上,大家要知道,我们开发好的系统,最终都是要构建一个Producer组件,然后通过Producer去发送消息到MQ的Broker上去的。

3.3、Producer是如何构造出来的?

DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");

producer.setNamesrvAddr("localhost:9876");
producer.start();

构造Producer很简单,就是创建一个DefaultMQProducer对象实例,在其中传入你所属的Producer分组,然后设置一下NameServer的地址,最后调用他的start()方法,启动这个Producer就可以了。其实创建DefaultMQProducer对象实例是一个非常简单的过程,无非就是创建这么一个对象出来,然后保存一下他的Producer分组。

        设置NameServer地址也是一个很简单的过程,无非就是保存一下NameServer地址罢了。

其实最核心的还是调用了这个DefaultMQProducer的start()方法去启动了这个消息生产组件,那么这个start()都干了什么呢?

4、构建好的Producer是如何启动准备好相关资源的?

Producer组件在启动的时候是如何准备好相关资源的,因为他必须内部得有独立的线程资源,还有得跟Broker建立网络连接,这样才能把我们的消息发送出去。

其实我们在构造Producer的时候,他内部构造了一个真正用于执行消息发送逻辑的组件,就
是DefaultMQProducerImpl这个类的实例对象,真正的生产组件其实是这个组件。

那么这个组件在启动的时候都干了什么呢?
        要站在Producer的核心行为的角度去看。

        首先我们都知道一件事儿,假设我们后续要通过Producer发送消息,必然会指定我们要往哪个Topic里发送消息。所以我们也知道,Producer必然是知道Topic的一些路由数据的,比如Topic有哪些MessageQueue,每个MessageQueue在哪些Broker上。

那么现在问题来了,到底是Producer刚启动初始化的时候,就会去拉取每个Topic的路由数据呢?还是等你第一次往一个Topic发送消息的时候再拉取路由数据呢
        肯定不可能是刚初始化启动的时候就拉取Topic的路由数据,因为你刚开始启动的时候,不知

道要发送消息到哪个Topic去啊! 一定是在你第一次发送消息到Topic的时候,才会去拉取一个Topic的路由数据,包括这个Topic有几个MessageQueue,每个MessageQueue在哪个Broker上,然后从中选择一个MessageQueue,跟那台Broker建立网络连接,发送消息过去。
第二个问题,Producer发送消息必然要跟Broker建立网络,这个是在Producer刚启动的时候就立马跟所有的Broker建立网络连接吗?

        
        那必然也不是的,因为此时你也不知道你要跟哪个Broker进行通信。所以其实很多核心的逻辑,包括Topic路由数据拉取,MessageQueue选择,以及跟Broker建立网络连接,通过网络

连接发送消息到Broker去,这些逻辑都是在Producer发送消息的时候才会有。

5、当发送消息时,是如何从NameServer拉取Topic元数据的?

其实比如拉取Topic的路由数据,选择MessageQueue,跟Broker构建长连接,发送消息过去,这些核心的逻辑,都是封装在发送消息的方法中的。
        因此我们今天就从发送消息的方法开始讲起,实际上当你调用Producer的send()方法发送消息的时候,这个方法调用会一直到比较底层的逻辑里去,最终会调用到DefaultMQProducerImpl类的sendDefaultImpl()方法里去,在这个方 法里,上来什么都没干,直接就有一行非常关键的代码,如下。TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

每次你发送消息的时候,他都会先去检查一下,这个你要发送消息的那个Topic的路由数据是否在你客户端本地。如果不在的话,必然会发送请求到NameServer那里去拉取一下的,然后缓存在客户端本地。

重点来看看,这个Producer客户端运行在你的业务系统里的时候,他如何从NameServer拉取到你的Topic的路由数据的?

其实当你进入了this.tryToFindTopicPublishInfo(msg.getTopic())这个方法逻辑之后,会发现他的逻辑非常的简单。其实简单来说,他就是先检查了一下自己本地是否有这个Topic的路由数据的缓存,如果没有的话就发送网络请求到NameServer去拉取,如果有的话,就直接返回本地Topic路由数据缓存了:

** Producer到底是如何发送网络请求到NameServer去拉取Topic路由数据的?**

其实这里就对应了tryToFindTopicPublishInfo()方法内的一行代码:
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

        通过这行代码,他就可以去从NameServer拉取某个Topic的路由数据,然后更新到自己本地的缓存里去了。

具体的发送请求到NameServer的拉取过程,简单来说,就是封装一个Request请求对象, 然后通过底层的Netty客户端发送请求到NameServer,接收到一个Response响应对象。 然后他就会从Response响应对象里取出来自己需要的Topic路由数据,更新到自己本地缓存里去,更新的时候会做一 些判断,比如Topic路由数据是否有改变过等,然后把Topic路由数据放本地缓存就可以了:

6、对于一条消息,Producer是如何选择MessageQueue发送的

当你拿到了一个Topic的路由数据之后,其实接下来就应该选择要发送消息到这个 Topic的哪一个MessageQueue上去了!
        Topic是一个逻辑上的概念,一个Topic的数据往往是分布式存储在多台Broker机器上的,因此

Topic本质是由多个MessageQueue组成的。 每个MessageQueue都可以在不同的Broker机器上,当然也可能一个Topic的多个MessageQueue在一个Broker机器上:

你要发送的消息,到底应该发送到这个Topic的哪个MessageQueue上去呢?

        只要你知道了要发送消息到哪个MessageQueue上去,然后就知道这个MessageQueue在哪台Broker机器上,接着就跟那台Broker机器建立连接,发送消息给他就可以了。
        发送消息的核心源码是在DefaultMQProducerImpl.sendDefaultImpl()方法中的,在这个方法里,只要你获取到了Topic的路由数据,不管从本地缓存获取的,还是从NameServer拉取到的,接着就会执行下面的核心 代码。MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

这行代码其实就是在选择Topic中的一个MessageQueue,然后发送消息到这个MessageQueue去,在这行代码里面,实现了一些Broker故障自动回避机制,但是这个我们后续再讲,先看最基本的选择MessageQueue的算法:

他先获取到了一个自增长的index,大家注意到没有?
        接着其实他核心的就是用这个index对Topic的MessageQueue列表进行了取模操作,获取到了一个MessageQueue 列表的位置,然后返回了这个位置的MessageQueue。

        这种操作就是一种简单的负载均衡的算法,比如一个Topic有8个 MessageQueue,那么可能第一次发送消息到MessageQueue01,第二次就发送消息到MessageQueue02,以此类推,就是轮询把消息发送到各个MessageQueue而已!
        这就是最基本的MessageQueue选择算法,但是肯定有人会说了,那万一某个Broker故障了呢?此时发送消息到哪里去呢?所以其实这个算法里有很多别的代码,都是实现Broker规避机制

7、我们的系统与RocketMQ Broker之间是如何进行网络通信?

Producer是如何把消息发送给Broker的呢?
其实这块代码就在DefaultMQProducerImpl.sendDefaultImpl()方法中,在这个方法里,先是获取到了 MessageQueue所在的broker名称,如下源码片段:

获取到了brokerName之后,接着其实就可以使用如下的代码把消息投递到那个Broker上去了

** 如何把消息投递出去的?**

        是通过brokerName去本地缓存找他的实际的地址,如果找不到,就去找NameServer拉取Topic的路由数据,然后再次在本地缓存获取broker的实际地址,你有这个地址了,才能给人家进行网络通信。

        接下来的源码就很繁琐细节了,他就是用自己的方式去封装了一个Request请求出来,这里涉
及到了各种信息的封装,包括了请求头,还有一大堆所有你需要的数据,都封装在Request里了。

他在这里做的事情,大体上包括了给消息分配全局唯一ID、对超过4KB的消息体进行压缩,在消息Request中包含了生产者组、Topic名称、Topic的MessageQueue数量、MessageQueue的ID、消息发送时间、消息的flag、消息扩展属性、消息重试次数、是否是批量发送的消息、如果是事务消息则带上prepared标记,等等。
        总之,这里就是封装了很多很多的数据就对了,这些东西都封装到一个Request里去,然后在底层还是通过Netty把这个请求发送出去,发送到指定的Broker上去就可以了返回重新加载。

        这里Producer和Broker之间都是通过Netty建立长连接,然后基于长连接进行持续的通信:

8、当Broker获取到一条消息之后,他是如何存储这条消息的?

Broker通过Netty网络服务器获取到一条消息,接着就会把这条消息写入到一个CommitLog文件里去,一个Broker机器上就只有一个CommitLog文件,所有Topic的消息都会写入到一个文件里去:

然后同时还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何一条消息都是写入一个MessageQueue的,那个MessageQueue其实就是对应了一个ConsumeQueue文件 。所以一条写入MessageQueue的消息,必然会异步进入对应的ConsumeQueue文件,

同时还会异步把消息写入一个IndexFile里,在里面主要就是把每条消息的key和消息在CommitLog中的offset偏移量做一个索引,这样后续如果要根据消息key从CommitLog文件里查询消息,就可以根据IndexFile的索引来了。

** 写入这几个文件的一个流程:**

  •         首先Broker收到一个消息之后,必然是先写入CommitLog文件的,那么这个CommitLog文件在磁盘上的目录结构大致如何呢?看下面,

  • CommitLog文件的存储目录是在${ROCKETMQ_HOME}/store/commitlog下的,里面会有很多的CommitLog文件,每个文件默认是1GB大小,一个文件写满了就创建一个新的文件,文件名的话,就是文件中的第一个偏移量,如下面所示。文件名如果不足20位的话,就用0来补齐就可以了。打印
    00000000000000000000

000000000003052631924

        在把消息写入CommitLog文件的时候,会申请一个putMessageLock锁 。也就是说,在Broker上写入消息到CommitLog文件的时候,都是串行的,不会让你并发的写入,并发写入文件必然会
有数据错乱的问题,下面是源码片段。

接着其实会对消息做出一通处理,包括设置消息的存储时间、创建全局唯一的消息ID、计算消息的总长度,然后会走一段很关键的源码,把消息写入到MappedFile里去:

上面源码片段中,其实最关键的是cb.doAppend()这行代码,这行代码其实是把消息追加到MappedFile映射的一块内存里去,并没有直接刷入磁盘中,如下图所示。

至于具体什么时候才会把内存里的数据刷入磁盘,其实要看我们配置的刷盘策略,这个我们后续会讲解,另外就是不 管是同步刷盘还是异步刷盘,假设你配置了主从同步,一旦你写入完消息到CommitLog之后,接下来都会进行主从同步复制的。

9、一条消息写入CommitLog文件后,如何实时更新索引文件?

Broker收到一条消息之后,其实就会直接把消息写入到CommitLog里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘。

**   一个消息写入CommitLog之后,然后消息是如何进入ConsumeQueue和IndexFile的。**
实际上,Broker启动的时候会开启一个线程,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile:

在DefaultMessageStore的start()方法里,在里面就是启动了这个ReputMessageService线程。 这个DefaultMessageStore的start()方法就是在Broker启动的时候调用的,所以相当于是Broker启动就会启动这个线程。

        也就是说,在这个线程里,每隔1毫秒,就会把最近写入CommitLog的消息进行一次转发,转发到ConsumeQueue和IndexFile里去,通过的是doReput()方法来实现的,我们再看doReput()方法里的实现逻辑:

        这段代码意思非常的清晰明了,就是从commitLog中去获取到一个DispatchRequest,拿到了一份需要进行转发的消息,也就是从CommitLog中读取的,我们画在下面示意图里

接着他就会通过下面的代码,调用doDispatch()方法去把消息进行转发,一个是转发到ConsumeQueue里去,一个是转发到IndexFile里去。大家看下面的源码片段,里面走了CommitLogDispatcher的循环

​​​​​​​        ​​​​​​​实际上正常来说这个CommitLogDispatcher的实现类有两个,分别是CommitLogDispatcherBuildConsumeQueue 和CommitLogDispatcherBuildIndex,他们俩分别会负责把消息转发到ConsumeQueue和IndexFile:

ConsumeQueueDispatche的源码实现逻辑,就是找到当前Topic的messageQueueId对应的一个ConsumeQueue文件 。一个MessageQueue会对应多个ConsumeQueue文件,找到一个即可,然后消息写入其中。

看看IndexFile的写入逻辑,无非就是在IndexFile里去构建对应的索引罢了,如下面的源码片段。

当我们把消息写入到CommitLog之后,有一个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ConsumeQueue和IndexFile里去,这就是他底层的实现原理。
 

10、RocketMQ是如何实现同步刷盘以及异步刷盘两种策略的?

写入CommitLog的数据进入到MappedFile映射的一块内存里之后,后续会执行刷盘策略。

比如是同步刷盘还是异步刷盘。

  • 如果是同步刷盘,那么此时就会直接把内存里的数据写入磁盘文件。
  • 如果是异步刷盘,那么就是过一段时间之后,再把数据刷入磁盘文件里去。
    底层到底是如何执行不同的刷盘策略的?

        大家应该还记得之前我们说过,往CommitLog里写数据的时候,是调用的CommitLog类的putMessage()这个方法:

大家会发现在末尾有两个方法调用,一个是handleDishFlush(),一个是handleHA()

顾名思义,一个就是用于决定如何进行刷盘的,一个是用于决定如何把消息同步给Slave Broker的。
重点进入到handleDiskFlush()方法里去,看看他是如何处理刷盘的:

上面代码我们就看的很清晰了,其实他里面是根据你配置的两种不同的刷盘策略分别处理的。

先看第一种,就是 同步刷盘的策略是如何处理的:

上面就是构建了一个GroupCommitRequest,然后提交给了GroupCommitService去进行处理,然后调用****request.waitForFlush()方法等待同步刷盘成功。
        万一刷盘失败了,就打印日志。具体刷盘是由GroupCommitService执行的,他的doCommit()方法最终会执行同步刷盘的逻辑,里面有如下代码。

上面那行代码一层一层调用下去,最终刷盘其实是靠的MappedByteBuffer的force()方法,如下所示

        这个MappedByteBuffer就是JDK NIO包下的API,他的force()方法就是强迫把你写入内存的数据刷入到磁盘文件里去,到此就是同步刷盘成功了。

那么如果是异步刷盘呢?我们先看CommitLog.handleDiskFlush()里的的代码片段。

        其实这里就是唤醒了一个flushCommitLogService组件,那么他是什么呢?看下面的代码片段。 FlushCommitLogService其实是一个线程,他是个抽象父类,他的子类是CommitRealTimeService,所以真正唤醒的是他的子类代表的线程。

具体在子类线程的run()方法里就有定时刷新的逻辑:其实简单来说,就是每隔一定时间执行一次刷盘,最大间隔是10s,所以一旦执行异步刷盘,那么最多就是10秒就会执行一次刷盘。

11、当Broker上的数据存储超过一定时间之后,磁盘数据是如何清理的?

broker是如何把磁盘上的数据给删掉的。
        其实默认broker会启动后台线程,这个后台线程会自动去检查CommitLog、ConsumeQueue

文件,因为这些文件都是多个的,比如CommitLog会有多个,ConsumeQueue也会有多个。

  •   然后如果是那种比较旧的超过72小时的文件,就会被删除掉,也就是说,默认来说,broker只会给你把数据保留3天而已,当然你也可以自己通过fileReservedTime来配置这个时间,要保留几天的时间。
  • 这个定时检查过期数据文件的线程代码,在DefaultMessageStore这个类里,他的start()方法中会调用一个 addScheduleTask()方法,里面会每隔10s定时调度执行一个后台检查任务,

上面就可以看到了,其实他是每隔10s,就会执行一个调度任务。这个调度任务里就会执行DefaultMessageStore.this.cleanFilesPeriodically()方法,其实就是会去周期性的清理掉磁盘上的数据文件,也就是超过72小时的CommitLog、ConsumeQueue文件,

体看看这里的清理逻辑,他其实里面包含了清理CommitLog和ConsumeQueue的清理逻辑,

在清理文件的时候,他会具体判断一下

  • 如果当前时间是预先设置的凌晨4点,就会触发删除文件的逻辑,这个时间是默认的;

  • 或者是如果磁盘空间不足了,就是超过了85%的使用率了,立马会触发删除文件逻辑。

  • 上面两个条件,第一个是说如果磁盘没有满 ,那么每天就默认一次会删除磁盘文件,默认就是凌晨4点执行,那个时候必然是业务低峰期,因为凌晨4点大部分人都睡觉了,无论什么业务都不会有太高业务量的。

  • 第二个是说,如果磁盘使用率超过85%了,那么此时可以允许继续写入数据,但是此时会立马触发删除文件的逻辑;如果磁盘使用率超过90%了,那么此时不允许在磁盘里写入新数据,立马删除文件。

  • 这是因为,一旦磁盘满了,那么你写入磁盘会失败,此时你MQ就彻底故障了。

  • 所以一旦磁盘满了,也会立马删除文件的。
    在删除文件的时候,无非就是对文件进行遍历,如果一个文件超过72小时都没修改过了,此时就可以删除了,哪怕有的消息你可能还没消费过,但是此时也不会再让你消费了,就直接删除掉。

这就是RocketMQ的一整套文件删除的逻辑和机制。

相关文章