当我们的NameServer启动之后,其实他就是有一个Netty服务器监听了9876端口号,此时Broker、客户端这些就可以跟NameServer建立长连接和进行网络通信了!
既然NameServer已经启动了,而且我们知道他已经有一个Netty服务器在监听端口号,等待接收别人的连接和请求了,接着我们就应该看看Broker是如何启动的了!
启动Broker的时候也是通过mqbroker这种脚本来实现的,最终脚本里一定会启动一个JVM进程,开始执行一个main class的代码。
进入这个BrokerStartup类,在里面可以看到一个main()方法,如下所示:
public static void main(String[] args) {
start(createBrokerController(args));
}
** 先创建了一个Controller核心组件,然后用start()方法去启动这个Controller组件**!
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
解析你通过命令 行给broker传递的一些参数的,这些参数在main()方法里通过上面的args传递进来,然后在这里他就是通过ServerUtil.parseCmdLine()方法,在解析这些命令行参数罢了!
broker启动的时候先搞了几个核心的配置组件,包括broker自己的配置、 broker作为一个netty服务器的配置、broker作为一个netty客户端的配置、broker的消息存储的配置。
为什么broker自己又是netty服务器,又是netty客户端呢?
接着我们看看他是如何为自己的核心配置类,解析和填充信息的
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
假设说你在启动broker的时候,用了-c选项带了一个配置文件的地址,此时他会读取配置文 件里的你自定义的一些配置的信息,然后读取出来覆盖到那4个核心配置类里去。
大家都记得我们之前启动broker的时候,其实都是要自定义一个broker配置文件的,然后用mqbroker启动的时候,都是要用-c选项带上自己的配置文件地址的,就是在上面的代码中,他会读取我们自定义的配置文件,填充到他的配置类里去。
任何其他的开源项目,可能都有类似的代码,就是构建配置类,读取配置文件的配置,解析命令行的配置参数,然后做各种配置的校验和设置。
最终他就会在这里得到4个填充完整的配置类了!
本质上就是用默认的配置参数值以及我们配置文件里的配置参数值,包括命令行传递的配置参数值,去填充到这些配置组件中去。然后后续你Broker运行的过程中,各种行为自然都是根据这些配置组件里的配置参数值来走的,
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
创建了一个核心的BrokerController组件,这个BrokerController组件,你大致可以认为就是代表了Broker他自己好了
我们用mqbroker脚本启动的JVM进程,可以认为就是一个Broker,这里Broker实际上应该是代表了一个JVM进程的概念,而不是任何一个代码组件!
然后BrokerStartup作为一个main class,其实是属于一个代码组件,他的作用是准备好核心配置组件,然后就是创建、初始化以及启动BrokerController这个核心组件,也就是启动一个Broker管理控制组件,让BrokerController去控制和管理Broker这个JVM进程运行过程中的一切行为,包括接收网络请求、包括管理磁盘上的消息数据,以及一大堆的后台线程的运行。
3、BrokerController的构造函数
主要知道BrokerController内部是有一系列的功能性组件的,还有一大堆的后台线程池,知道这两点就可以了,如下图。
现在BrokerController都创建好了,里面的一大堆核心功能组件和后台线程池都创建好了,接 下来他还要做一些初始化的工作,在createBrokerController()方法里:
下面就是BrokerController.initialize()方法的完整的源码分析:
BrokerController里也包含核心的Netty服务器,用来接收和处理Producer、Consumer请求。
上面一大堆的处理请求的线程池的初始化和启动后台定时调度任务的代码。
总结:
最核心的,BrokerController一旦初始化完成过后,就准备好了Netty服务器,可以用于接收网络请求,然后准备好了处理各种请求的线程池,准备好了各种执行后台定时调度任务的线程池。
这些都准备好后,接着BrokerController的启动,他的启动,必然会正式完成Netty服务器的启动,他于是可以接收请求了,同时Broker必然会在完成启动的过程中去向NameServer进行注册以及保持心跳的。只有这样,Producer才能从NameServer上找到你这个Broker,同时发送消息给你。
最终要对BrokerContorller执行一下启动的逻辑,让他里面的一些功能组件完成启动时候需要执行的一些工作,同时最核心的就是完成Netty服务器的启动,让他去监听一个端口号,可以接收别人的请求。
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
Netty服务器启动了,可以接收网络请求了,然后还有一个BrokerOuterAPI组件是基于Netty客户端发送请求给别人的,同时还启动一个线程去向NameServer注册,知道这几点就可以了。
只能说在看到现在这个程度的时候,你大致脑子里有个印象,你知道Broker里有这么一些核心组件,都进行了初始化以及完成了启动,但是你应该最主要关注的事情是这么几个:
上面我说的那些东西,每一个都是RocketMQ这个中间件运行的时候一个场景,一定要从这些场景出发,一点点去理解在每一个场景下,RocketMQ的各个源码中的组件是如何配合运行的。
BrokerController启动的过程,其实他本质就是启动了Netty服务器去接收网络请求,然后启动了一堆核心功能组件,启动了一些处理请求的线程池,启动了一些执行定时调度任务的后台线程。
最为关键的一点,就是他执行了将自己注册到NameServer的一个过程是在BrokerController.start()方法中。
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
Broker往NameServer进行注册的场景。 因为只有完成了注册,NameServer才能知道集群里有哪些Broker,然后Producer和Consumer才能找NameServer去拉取路由数据,他们才知道集群里有哪些Broker,才能去跟Broker进行通信!
真正的注册方法:
实际上就是通过 BrokerOuterAPI去发送网络请求给所有的NameServer,把这个Broker注册了上去。
深入到网络请求级别的Broker注册逻辑
上面这段代码里,大家最主要的,是要提取出来RequestHeader和RequestBody两个概念,就是通过请求头和请求体构成了一个请求,然后会通过底层的NettyClient把这个请求发送到NameServer去进行注册。
现在我们进入到真正的注册Broker的网络请求方法里去看看,其实入口就是下面这行代码:
RegisterBrokerResult result = registerBroker(
namesrvAddr,oneway, timeoutMills,requestHeader,body);
进入这个方法之后,会看到下面的一段代码:
看到这里,有没有发现最终的请求是基于NettyClient这个组件给发送出去的?
进入到NettyClient的网络请求方法中去看:
通过Channel这个概念,表示出了Broker和NameServer之间的一个网络连接的概念,然后通过这个Channel就可以发送实际的网络请求出去!
3、如何跟NameServer建立网络连接?
接着我们进入上面的this.getAndCreateChannel(addr)这行代码看看,他是如何跟NameServer之间建立实际的网络连接的?
先从缓存里尝试获取连接,如果没有缓存的话,就创建一个连接。
看下面的this.createChannel(addr)方法是如何实际通过一个NameServer的地址创建出来一个网络连接的。
只要上面的Channel网络连接建立起来之后,我下面画红圈的地方,其实Broker和NameServer都会有 一个Channel用来进行网络通信。
核心入口就是下面的方法
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
最终底层其实就是基于Netty的Channel API,把注册的请求给发送到了NameServer就可以了。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/mingyuli/article/details/120981480
内容来源于网络,如有侵权,请联系作者删除!