到NamesrvController这个类的初始化的方法里去,也就是NamesrvController.initialize()这个方法
到NettyServer是用于接收网络请求 的,那么接收到的网络请求给谁处理呢?
其实就是给DefaultRequestProcessor这个请求处理组件来进行处理的。
我们如果要知道Broker注册请求是如何处理的,直接就是看DefaultRequestProcessor中的代码就可以了.
接着我们进入这个类里的registerBroker()方法,去看看到底如何完成Broker注册的。
下面我们先在图里给大家体现一下RouteInfoManager这个路由数据管理组件,实际Broker注册就是通过他来做的:
到RouteInfoManager的注册Broker的方法里去看看,最终如何把一个Broker机器的数据放入RouteInfoManager中维护的路由数据表里去的。
无非就是用一些Map类的数据结构,去存放你的Broker的路由数据就可以了,包括了Broker的clusterName、brokerId、brokerName这些核心数据。 而且在更新的时候,一定会基于Java并发包下的ReadWriteLock进行读写锁加锁,因为在这里更新那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程来更新他们才行!
NameServer核心是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor请求处理组件,来处理Broker注册请求。而真正的Broker注册的逻辑是放在RouteInfoManager这个路由数据管理组件里来进行实现的,最终Broker路由数据都会存放在RouteInfoManager内部的一些Map数据结构组成的路由数据表中。
Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直都存活着,然后如果Broker一段时间没有发送心跳到NameServer,那么NameServer是如何感知到Broker已经挂掉了。
Broker中的发送注册请求给NameServer的一个源码入口,其实就是在BrokerController.start()方法中,在BrokerController启动的时候,他其实并不是仅仅发送一次注册请求,而是启动了一个定时任务,会每隔一段时间就发送一次注册请求。
启动了一个定时调度的任务,他默认是每隔30s就会执行一次Broker注册的过程,上面的
registerNameServerPeriod是一个配置,他默认的值就是30s一次。
默认情况下,第一次发送注册请求就是在进行注册,他会把Broker路由数据放入到NameServer的RouteInfoManager的路由数据表里去。但是后续每隔30s他都会发送一次注册请求,这些后续定时发送的注册请求,其实本质上就是Broker发送心跳给NameServer了。
那么后续每隔30s,Broker就发送一次注册请求,作为心跳来发送给NameServer的时候,NameServer对后续重复发 送过来的注册请求(也就是心跳),是如何进行处理的呢?
看一下RouteInfoManager的注册方法逻辑:
我们看下图,有一个红色圈圈示意了,每隔30s你发送注册请求作为心跳的时候RouteInfoManager里会进行心 跳时间刷新的处理:
接着我们来思考最后一个问题,那么假设Broker已经挂了,或者故障了,隔了很久都没有发送那个每隔30s一次的注册请求作为心跳,那么此时NameServer是如何感知到这个Broker已经挂掉的呢?
我们重新回到NamesrvController的initialize()方法里去,里面有一个代码是启动了RouteInfoManager中的一个定时扫描不活跃Broker的线程。
上面这段代码,就是启动一个定时调度线程,每隔10s扫描一次目前不活跃的Broker,使用的是RouteInfoManager中的scanNotActiveBroke()方法,我们去看看那个方法的逻辑,就知道他如何感知到一个Broker挂掉了。
NameServer启动之后,就会有一个核心的NamesrvController组件,他就是用于控制NameServer的所有行为的,包括内部启动一个Netty服务器去监听一个9876端口号,然后接收处理Broker和客户端发送过来的请求。
Broker启动之后,最核心的就是有一个BrokerController组件管控Broker的整体行为,包括初始化自己的Netty服务器用于接收客户端的网络请求,包括启动处理请求的线程池、执行定时任务的线程池,初始化核心功能组件,同时还会启动之后发送注册请求到NameServer去注册自己。
Broker启动之后进行注册以及定时发送注册请求作为心跳的机制。
NameServer有一个后台进程定时检查每个Broker的最近一次心跳时间,如果长时间没心跳就认为Broker已经故障。
在讲完这些内容过后,你可以认为在我们的RocketMQ集群里,已经启动好了NameServer,而且还启动了一批Broker,同时Broker都已经把自己注册到NameServer里去了,NameServer也会去检查这批Broker是否存活。
其实此时我们不需要去关注NameServer和Broker干了别的什么事情,这个时候我们只要知道已经有了一个可用的RocketMQ集群就可以了,然后此时我们是不是就可以让自己开发好的系统去发送消息到MQ里去了?
没错,所以此时我们就需要引入一个Producer组件了,实际上,大家要知道,我们开发好的系统,最终都是要构建一个Producer组件,然后通过Producer去发送消息到MQ的Broker上去的。
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
构造Producer很简单,就是创建一个DefaultMQProducer对象实例,在其中传入你所属的Producer分组,然后设置一下NameServer的地址,最后调用他的start()方法,启动这个Producer就可以了。其实创建DefaultMQProducer对象实例是一个非常简单的过程,无非就是创建这么一个对象出来,然后保存一下他的Producer分组。
设置NameServer地址也是一个很简单的过程,无非就是保存一下NameServer地址罢了。
其实最核心的还是调用了这个DefaultMQProducer的start()方法去启动了这个消息生产组件,那么这个start()都干了什么呢?
Producer组件在启动的时候是如何准备好相关资源的,因为他必须内部得有独立的线程资源,还有得跟Broker建立网络连接,这样才能把我们的消息发送出去。
其实我们在构造Producer的时候,他内部构造了一个真正用于执行消息发送逻辑的组件,就
是DefaultMQProducerImpl这个类的实例对象,真正的生产组件其实是这个组件。
那么这个组件在启动的时候都干了什么呢?
要站在Producer的核心行为的角度去看。
首先我们都知道一件事儿,假设我们后续要通过Producer发送消息,必然会指定我们要往哪个Topic里发送消息。所以我们也知道,Producer必然是知道Topic的一些路由数据的,比如Topic有哪些MessageQueue,每个MessageQueue在哪些Broker上。
那么现在问题来了,到底是Producer刚启动初始化的时候,就会去拉取每个Topic的路由数据呢?还是等你第一次往一个Topic发送消息的时候再拉取路由数据呢?
肯定不可能是刚初始化启动的时候就拉取Topic的路由数据,因为你刚开始启动的时候,不知
道要发送消息到哪个Topic去啊! 一定是在你第一次发送消息到Topic的时候,才会去拉取一个Topic的路由数据,包括这个Topic有几个MessageQueue,每个MessageQueue在哪个Broker上,然后从中选择一个MessageQueue,跟那台Broker建立网络连接,发送消息过去。
第二个问题,Producer发送消息必然要跟Broker建立网络,这个是在Producer刚启动的时候就立马跟所有的Broker建立网络连接吗?
那必然也不是的,因为此时你也不知道你要跟哪个Broker进行通信。所以其实很多核心的逻辑,包括Topic路由数据拉取,MessageQueue选择,以及跟Broker建立网络连接,通过网络
连接发送消息到Broker去,这些逻辑都是在Producer发送消息的时候才会有。
其实比如拉取Topic的路由数据,选择MessageQueue,跟Broker构建长连接,发送消息过去,这些核心的逻辑,都是封装在发送消息的方法中的。
因此我们今天就从发送消息的方法开始讲起,实际上当你调用Producer的send()方法发送消息的时候,这个方法调用会一直到比较底层的逻辑里去,最终会调用到DefaultMQProducerImpl类的sendDefaultImpl()方法里去,在这个方 法里,上来什么都没干,直接就有一行非常关键的代码,如下。TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
每次你发送消息的时候,他都会先去检查一下,这个你要发送消息的那个Topic的路由数据是否在你客户端本地。如果不在的话,必然会发送请求到NameServer那里去拉取一下的,然后缓存在客户端本地。
重点来看看,这个Producer客户端运行在你的业务系统里的时候,他如何从NameServer拉取到你的Topic的路由数据的?
其实当你进入了this.tryToFindTopicPublishInfo(msg.getTopic())这个方法逻辑之后,会发现他的逻辑非常的简单。其实简单来说,他就是先检查了一下自己本地是否有这个Topic的路由数据的缓存,如果没有的话就发送网络请求到NameServer去拉取,如果有的话,就直接返回本地Topic路由数据缓存了:
** Producer到底是如何发送网络请求到NameServer去拉取Topic路由数据的?**
其实这里就对应了tryToFindTopicPublishInfo()方法内的一行代码:
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
通过这行代码,他就可以去从NameServer拉取某个Topic的路由数据,然后更新到自己本地的缓存里去了。
具体的发送请求到NameServer的拉取过程,简单来说,就是封装一个Request请求对象, 然后通过底层的Netty客户端发送请求到NameServer,接收到一个Response响应对象。 然后他就会从Response响应对象里取出来自己需要的Topic路由数据,更新到自己本地缓存里去,更新的时候会做一 些判断,比如Topic路由数据是否有改变过等,然后把Topic路由数据放本地缓存就可以了:
当你拿到了一个Topic的路由数据之后,其实接下来就应该选择要发送消息到这个 Topic的哪一个MessageQueue上去了!
Topic是一个逻辑上的概念,一个Topic的数据往往是分布式存储在多台Broker机器上的,因此
Topic本质是由多个MessageQueue组成的。 每个MessageQueue都可以在不同的Broker机器上,当然也可能一个Topic的多个MessageQueue在一个Broker机器上:
你要发送的消息,到底应该发送到这个Topic的哪个MessageQueue上去呢?
只要你知道了要发送消息到哪个MessageQueue上去,然后就知道这个MessageQueue在哪台Broker机器上,接着就跟那台Broker机器建立连接,发送消息给他就可以了。
发送消息的核心源码是在DefaultMQProducerImpl.sendDefaultImpl()方法中的,在这个方法里,只要你获取到了Topic的路由数据,不管从本地缓存获取的,还是从NameServer拉取到的,接着就会执行下面的核心 代码。MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
这行代码其实就是在选择Topic中的一个MessageQueue,然后发送消息到这个MessageQueue去,在这行代码里面,实现了一些Broker故障自动回避机制,但是这个我们后续再讲,先看最基本的选择MessageQueue的算法:
他先获取到了一个自增长的index,大家注意到没有?
接着其实他核心的就是用这个index对Topic的MessageQueue列表进行了取模操作,获取到了一个MessageQueue 列表的位置,然后返回了这个位置的MessageQueue。
这种操作就是一种简单的负载均衡的算法,比如一个Topic有8个 MessageQueue,那么可能第一次发送消息到MessageQueue01,第二次就发送消息到MessageQueue02,以此类推,就是轮询把消息发送到各个MessageQueue而已!
这就是最基本的MessageQueue选择算法,但是肯定有人会说了,那万一某个Broker故障了呢?此时发送消息到哪里去呢?所以其实这个算法里有很多别的代码,都是实现Broker规避机制
Producer是如何把消息发送给Broker的呢?
其实这块代码就在DefaultMQProducerImpl.sendDefaultImpl()方法中,在这个方法里,先是获取到了 MessageQueue所在的broker名称,如下源码片段:
获取到了brokerName之后,接着其实就可以使用如下的代码把消息投递到那个Broker上去了
** 如何把消息投递出去的?**
是通过brokerName去本地缓存找他的实际的地址,如果找不到,就去找NameServer拉取Topic的路由数据,然后再次在本地缓存获取broker的实际地址,你有这个地址了,才能给人家进行网络通信。
接下来的源码就很繁琐细节了,他就是用自己的方式去封装了一个Request请求出来,这里涉
及到了各种信息的封装,包括了请求头,还有一大堆所有你需要的数据,都封装在Request里了。
他在这里做的事情,大体上包括了给消息分配全局唯一ID、对超过4KB的消息体进行压缩,在消息Request中包含了生产者组、Topic名称、Topic的MessageQueue数量、MessageQueue的ID、消息发送时间、消息的flag、消息扩展属性、消息重试次数、是否是批量发送的消息、如果是事务消息则带上prepared标记,等等。
总之,这里就是封装了很多很多的数据就对了,这些东西都封装到一个Request里去,然后在底层还是通过Netty把这个请求发送出去,发送到指定的Broker上去就可以了返回重新加载。
这里Producer和Broker之间都是通过Netty建立长连接,然后基于长连接进行持续的通信:
Broker通过Netty网络服务器获取到一条消息,接着就会把这条消息写入到一个CommitLog文件里去,一个Broker机器上就只有一个CommitLog文件,所有Topic的消息都会写入到一个文件里去:
然后同时还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何一条消息都是写入一个MessageQueue的,那个MessageQueue其实就是对应了一个ConsumeQueue文件 。所以一条写入MessageQueue的消息,必然会异步进入对应的ConsumeQueue文件,
同时还会异步把消息写入一个IndexFile里,在里面主要就是把每条消息的key和消息在CommitLog中的offset偏移量做一个索引,这样后续如果要根据消息key从CommitLog文件里查询消息,就可以根据IndexFile的索引来了。
** 写入这几个文件的一个流程:**
首先Broker收到一个消息之后,必然是先写入CommitLog文件的,那么这个CommitLog文件在磁盘上的目录结构大致如何呢?看下面,
CommitLog文件的存储目录是在${ROCKETMQ_HOME}/store/commitlog下的,里面会有很多的CommitLog文件,每个文件默认是1GB大小,一个文件写满了就创建一个新的文件,文件名的话,就是文件中的第一个偏移量,如下面所示。文件名如果不足20位的话,就用0来补齐就可以了。打印
00000000000000000000
000000000003052631924
在把消息写入CommitLog文件的时候,会申请一个putMessageLock锁 。也就是说,在Broker上写入消息到CommitLog文件的时候,都是串行的,不会让你并发的写入,并发写入文件必然会
有数据错乱的问题,下面是源码片段。
接着其实会对消息做出一通处理,包括设置消息的存储时间、创建全局唯一的消息ID、计算消息的总长度,然后会走一段很关键的源码,把消息写入到MappedFile里去:
上面源码片段中,其实最关键的是cb.doAppend()这行代码,这行代码其实是把消息追加到MappedFile映射的一块内存里去,并没有直接刷入磁盘中,如下图所示。
至于具体什么时候才会把内存里的数据刷入磁盘,其实要看我们配置的刷盘策略,这个我们后续会讲解,另外就是不 管是同步刷盘还是异步刷盘,假设你配置了主从同步,一旦你写入完消息到CommitLog之后,接下来都会进行主从同步复制的。
Broker收到一条消息之后,其实就会直接把消息写入到CommitLog里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘。
** 一个消息写入CommitLog之后,然后消息是如何进入ConsumeQueue和IndexFile的。**
实际上,Broker启动的时候会开启一个线程,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile:
在DefaultMessageStore的start()方法里,在里面就是启动了这个ReputMessageService线程。 这个DefaultMessageStore的start()方法就是在Broker启动的时候调用的,所以相当于是Broker启动就会启动这个线程。
也就是说,在这个线程里,每隔1毫秒,就会把最近写入CommitLog的消息进行一次转发,转发到ConsumeQueue和IndexFile里去,通过的是doReput()方法来实现的,我们再看doReput()方法里的实现逻辑:
这段代码意思非常的清晰明了,就是从commitLog中去获取到一个DispatchRequest,拿到了一份需要进行转发的消息,也就是从CommitLog中读取的,我们画在下面示意图里
接着他就会通过下面的代码,调用doDispatch()方法去把消息进行转发,一个是转发到ConsumeQueue里去,一个是转发到IndexFile里去。大家看下面的源码片段,里面走了CommitLogDispatcher的循环
实际上正常来说这个CommitLogDispatcher的实现类有两个,分别是CommitLogDispatcherBuildConsumeQueue 和CommitLogDispatcherBuildIndex,他们俩分别会负责把消息转发到ConsumeQueue和IndexFile:
ConsumeQueueDispatche的源码实现逻辑,就是找到当前Topic的messageQueueId对应的一个ConsumeQueue文件 。一个MessageQueue会对应多个ConsumeQueue文件,找到一个即可,然后消息写入其中。
看看IndexFile的写入逻辑,无非就是在IndexFile里去构建对应的索引罢了,如下面的源码片段。
当我们把消息写入到CommitLog之后,有一个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ConsumeQueue和IndexFile里去,这就是他底层的实现原理。
写入CommitLog的数据进入到MappedFile映射的一块内存里之后,后续会执行刷盘策略。
比如是同步刷盘还是异步刷盘。
大家应该还记得之前我们说过,往CommitLog里写数据的时候,是调用的CommitLog类的putMessage()这个方法:
大家会发现在末尾有两个方法调用,一个是handleDishFlush(),一个是handleHA()
顾名思义,一个就是用于决定如何进行刷盘的,一个是用于决定如何把消息同步给Slave Broker的。
重点进入到handleDiskFlush()方法里去,看看他是如何处理刷盘的:
上面代码我们就看的很清晰了,其实他里面是根据你配置的两种不同的刷盘策略分别处理的。
先看第一种,就是 同步刷盘的策略是如何处理的:
上面就是构建了一个GroupCommitRequest,然后提交给了GroupCommitService去进行处理,然后调用****request.waitForFlush()方法等待同步刷盘成功。
万一刷盘失败了,就打印日志。具体刷盘是由GroupCommitService执行的,他的doCommit()方法最终会执行同步刷盘的逻辑,里面有如下代码。
上面那行代码一层一层调用下去,最终刷盘其实是靠的MappedByteBuffer的force()方法,如下所示
这个MappedByteBuffer就是JDK NIO包下的API,他的force()方法就是强迫把你写入内存的数据刷入到磁盘文件里去,到此就是同步刷盘成功了。
那么如果是异步刷盘呢?我们先看CommitLog.handleDiskFlush()里的的代码片段。
其实这里就是唤醒了一个flushCommitLogService组件,那么他是什么呢?看下面的代码片段。 FlushCommitLogService其实是一个线程,他是个抽象父类,他的子类是CommitRealTimeService,所以真正唤醒的是他的子类代表的线程。
具体在子类线程的run()方法里就有定时刷新的逻辑:其实简单来说,就是每隔一定时间执行一次刷盘,最大间隔是10s,所以一旦执行异步刷盘,那么最多就是10秒就会执行一次刷盘。
broker是如何把磁盘上的数据给删掉的。
其实默认broker会启动后台线程,这个后台线程会自动去检查CommitLog、ConsumeQueue
文件,因为这些文件都是多个的,比如CommitLog会有多个,ConsumeQueue也会有多个。
上面就可以看到了,其实他是每隔10s,就会执行一个调度任务。这个调度任务里就会执行DefaultMessageStore.this.cleanFilesPeriodically()方法,其实就是会去周期性的清理掉磁盘上的数据文件,也就是超过72小时的CommitLog、ConsumeQueue文件,
体看看这里的清理逻辑,他其实里面包含了清理CommitLog和ConsumeQueue的清理逻辑,
在清理文件的时候,他会具体判断一下
如果当前时间是预先设置的凌晨4点,就会触发删除文件的逻辑,这个时间是默认的;
或者是如果磁盘空间不足了,就是超过了85%的使用率了,立马会触发删除文件逻辑。
上面两个条件,第一个是说如果磁盘没有满 ,那么每天就默认一次会删除磁盘文件,默认就是凌晨4点执行,那个时候必然是业务低峰期,因为凌晨4点大部分人都睡觉了,无论什么业务都不会有太高业务量的。
第二个是说,如果磁盘使用率超过85%了,那么此时可以允许继续写入数据,但是此时会立马触发删除文件的逻辑;如果磁盘使用率超过90%了,那么此时不允许在磁盘里写入新数据,立马删除文件。
这是因为,一旦磁盘满了,那么你写入磁盘会失败,此时你MQ就彻底故障了。
所以一旦磁盘满了,也会立马删除文件的。
在删除文件的时候,无非就是对文件进行遍历,如果一个文件超过72小时都没修改过了,此时就可以删除了,哪怕有的消息你可能还没消费过,但是此时也不会再让你消费了,就直接删除掉。
这就是RocketMQ的一整套文件删除的逻辑和机制。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/mingyuli/article/details/120984702
内容来源于网络,如有侵权,请联系作者删除!