中间件(八):RocketMQ代码解读2

x33g5p2x  于2021-10-27 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(200)

写在前面

当我们的NameServer启动之后,其实他就是有一个Netty服务器监听了9876端口号,此时Broker、客户端这些就可以跟NameServer建立长连接和进行网络通信了!
既然NameServer已经启动了,而且我们知道他已经有一个Netty服务器在监听端口号,等待接收别人的连接和请求了,接着我们就应该看看Broker是如何启动的了!

1、Broker启动的时候是如何初始化自己的核心配置的?

1、启动Broker

启动Broker的时候也是通过mqbroker这种脚本来实现的,最终脚本里一定会启动一个JVM进程,开始执行一个main class的代码。
        进入这个BrokerStartup类,在里面可以看到一个main()方法,如下所示:

public static void main(String[] args) {
    start(createBrokerController(args));
}

  **      先创建了一个Controller核心组件,然后用start()方法去启动这个Controller组件**!

public static BrokerController createBrokerController(String[] args) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
     if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
        NettySystemConfig.socketSndbufSize = 131072;
    }
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
        NettySystemConfig.socketRcvbufSize = 131072;
    }
  • System.setProperty(),他这个明显是在设置一个系统级的变量。
  • 如果某个系统级的变量没有设置,那么就在这里设置,而且明显发现,他设置的是Netty网络通信相关的变量,就是socket的发送缓冲大小。

2、Broker的核心配置类

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    new PosixParser());
if (null == commandLine) {
    System.exit(-1);
}

解析你通过命令 行给broker传递的一些参数的,这些参数在main()方法里通过上面的args传递进来,然后在这里他就是通过ServerUtil.parseCmdLine()方法,在解析这些命令行参数罢了!

broker启动的时候先搞了几个核心的配置组件,包括broker自己的配置、 broker作为一个netty服务器的配置、broker作为一个netty客户端的配置、broker的消息存储的配置。

为什么broker自己又是netty服务器,又是netty客户端呢?

  • 当你的客户端连接到broker上发送消息的时候,那么broker就是一个netty服务器,负责监听客户端的连接请求。
  • 但是当你的broker跟nameserver建立连接的时候,你的broker又是一个netty客户端,他要跟nameserver的netty服务器建立连接。
    几个核心配置组件:
     

3**、为核心配置类解析和填充信息**

接着我们看看他是如何为自己的核心配置类,解析和填充信息的

if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        configFile = file;
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);

        properties2SystemEnv(properties);
        MixAll.properties2Object(properties, brokerConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        MixAll.properties2Object(properties, nettyClientConfig);
        MixAll.properties2Object(properties, messageStoreConfig);

        BrokerPathConfigHelper.setBrokerConfigPath(file);
        in.close();
    }
}

假设说你在启动broker的时候,用了-c选项带了一个配置文件的地址,此时他会读取配置文 件里的你自定义的一些配置的信息,然后读取出来覆盖到那4个核心配置类里去。

        大家都记得我们之前启动broker的时候,其实都是要自定义一个broker配置文件的,然后用mqbroker启动的时候,都是要用-c选项带上自己的配置文件地址的,就是在上面的代码中,他会读取我们自定义的配置文件,填充到他的配置类里去。

任何其他的开源项目,可能都有类似的代码,就是构建配置类,读取配置文件的配置,解析命令行的配置参数,然后做各种配置的校验和设置。

最终他就会在这里得到4个填充完整的配置类了!

2、BrokerController是如何构建出来的,以及包含了哪些组件?

本质上就是用默认的配置参数值以及我们配置文件里的配置参数值,包括命令行传递的配置参数值,去填充到这些配置组件中去。然后后续你Broker运行的过程中,各种行为自然都是根据这些配置组件里的配置参数值来走的,

final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

创建了一个核心的BrokerController组件,这个BrokerController组件,你大致可以认为就是代表了Broker他自己好了

  • BrokerStartup类就是用来启动Broker的一个类,他里面包含的是把Broker给进行初始化和完成全部启动工作的逻辑。
  • BrokerController,中文叫做“Broker管理控制组件”。这个组件其实被创建出来以及初始化完毕之后,就是用来控制当前在运行的这个Broker的!

我们用mqbroker脚本启动的JVM进程,可以认为就是一个Broker,这里Broker实际上应该是代表了一个JVM进程的概念,而不是任何一个代码组件!
然后BrokerStartup作为一个main class,其实是属于一个代码组件,他的作用是准备好核心配置组件,然后就是创建、初始化以及启动BrokerController这个核心组件,也就是启动一个Broker管理控制组件,让BrokerController去控制和管理Broker这个JVM进程运行过程中的一切行为,包括接收网络请求、包括管理磁盘上的消息数据,以及一大堆的后台线程的运行。


3、BrokerController的构造函数

主要知道BrokerController内部是有一系列的功能性组件的,还有一大堆的后台线程池,知道这两点就可以了,如下图。

4、初始化BrokerController的时候,都干了哪些事情

现在BrokerController都创建好了,里面的一大堆核心功能组件和后台线程池都创建好了,接 下来他还要做一些初始化的工作,在createBrokerController()方法里:

下面就是BrokerController.initialize()方法的完整的源码分析: 

        BrokerController里也包含核心的Netty服务器,用来接收和处理Producer、Consumer请求。

上面一大堆的处理请求的线程池的初始化和启动后台定时调度任务的代码。

  • 后续Broker要处理一大堆的各种请求,那么不同的请求需要用不同的线程池里的线程来处理。
  • Broker要执行一大堆的后台定时调度执行的任务,后台定时任务要通过线程池来调度定时任务
    一种线程池是用来处理别人发送过来的请求的,一种线程池是执行后台定时调度任务的。

总结:

最核心的,BrokerController一旦初始化完成过后,就准备好了Netty服务器,可以用于接收网络请求,然后准备好了处理各种请求的线程池,准备好了各种执行后台定时调度任务的线程池。
这些都准备好后,接着BrokerController的启动,他的启动,必然会正式完成Netty服务器的启动,他于是可以接收请求了,同时Broker必然会在完成启动的过程中去向NameServer进行注册以及保持心跳的。只有这样,Producer才能从NameServer上找到你这个Broker,同时发送消息给你。

5、BrokerContorller在启动的时候,都干了哪些事儿?

最终要对BrokerContorller执行一下启动的逻辑,让他里面的一些功能组件完成启动时候需要执行的一些工作,同时最核心的就是完成Netty服务器的启动,让他去监听一个端口号,可以接收别人的请求。

public static BrokerController start(BrokerController controller) {
    try {
        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

Netty服务器启动了,可以接收网络请求了,然后还有一个BrokerOuterAPI组件是基于Netty客户端发送请求给别人的,同时还启动一个线程去向NameServer注册,知道这几点就可以了。

只能说在看到现在这个程度的时候,你大致脑子里有个印象,你知道Broker里有这么一些核心组件,都进行了初始化以及完成了启动,但是你应该最主要关注的事情是这么几个:

  • Broker启动了,必然要去注册自己到NameServer去,所以BrokerOuterAPI这个组件必须要画到自己的图里去,这是一个核心组件。
  • Broker启动之后,必然要有一个网络服务器去接收别人的请求,此时NettyServer这个组件是必须要知道的。
  • 当你的NettyServer接收到网络请求之后,需要有线程池来处理,你需要知道这里应该有一个处理各种请求的线程池。
  • 你处理请求的线程池在处理每个请求的时候,是不是需要各种核心功能组件的协调?比如写入消息到 commitlog,然后写入索引到indexfile和consumer queue文件里去,此时你是不是需要对应的一些MessageStore之类的组件来配合你?
  • 除此之外,你是不是需要一些后台定时调度运行的线程来工作?比如定时发送心跳到NameServer去,类似这种事情。
            接着再往后走,一定要从各种场景驱动,去理解RocketMQ的源码,包括Broker的注册和心跳,客户端Producer的启动和初始化,Producer从NameServer拉取路由信息,Producer根据负载均衡算法选择一个Broker机器,Producer跟Broker建立网络连接,Producer发送消息到Broker,Broker把消息存储到磁盘。

        上面我说的那些东西,每一个都是RocketMQ这个中间件运行的时候一个场景,一定要从这些场景出发,一点点去理解在每一个场景下,RocketMQ的各个源码中的组件是如何配合运行的。

6、第三个场景驱动:Broker是如何把自己注册到NameServer的

BrokerController启动的过程,其实他本质就是启动了Netty服务器去接收网络请求,然后启动了一堆核心功能组件,启动了一些处理请求的线程池,启动了一些执行定时调度任务的后台线程。

最为关键的一点,就是他执行了将自己注册到NameServer的一个过程是在BrokerController.start()方法中。
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

Broker往NameServer进行注册的场景。 因为只有完成了注册,NameServer才能知道集群里有哪些Broker,然后Producer和Consumer才能找NameServer去拉取路由数据,他们才知道集群里有哪些Broker,才能去跟Broker进行通信!

真正的注册方法:

实际上就是通过 BrokerOuterAPI去发送网络请求给所有的NameServer,把这个Broker注册了上去。


深入到网络请求级别的Broker注册逻辑

上面这段代码里,大家最主要的,是要提取出来RequestHeader和RequestBody两个概念,就是通过请求头和请求体构成了一个请求,然后会通过底层的NettyClient把这个请求发送到NameServer去进行注册。

7、BrokerOuter API是如何发送注册请求的?

现在我们进入到真正的注册Broker的网络请求方法里去看看,其实入口就是下面这行代码:
RegisterBrokerResult result = registerBroker(

namesrvAddr,oneway, timeoutMills,requestHeader,body);

进入这个方法之后,会看到下面的一段代码:

看到这里,有没有发现最终的请求是基于NettyClient这个组件给发送出去的?

进入到NettyClient的网络请求方法中去看:

通过Channel这个概念,表示出了Broker和NameServer之间的一个网络连接的概念,然后通过这个Channel就可以发送实际的网络请求出去!

3、如何跟NameServer建立网络连接?        

接着我们进入上面的this.getAndCreateChannel(addr)这行代码看看,他是如何跟NameServer之间建立实际的网络连接的?

先从缓存里尝试获取连接,如果没有缓存的话,就创建一个连接。 

看下面的this.createChannel(addr)方法是如何实际通过一个NameServer的地址创建出来一个网络连接的。

        只要上面的Channel网络连接建立起来之后,我下面画红圈的地方,其实Broker和NameServer都会有 一个Channel用来进行网络通信。 

8**、如何通过Channel网络连接发送请求?**

核心入口就是下面的方法
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

最终底层其实就是基于Netty的Channel API,把注册的请求给发送到了NameServer就可以了。

相关文章