中间件(七):RocketMQ代码解读

x33g5p2x  于2021-10-26 转载在 其他  
字(16.6k)|赞(0)|评价(0)|浏览(167)

1、RocketMQ的源码目录结构

  • broker:顾名思义,这个里面存放的就是RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程
  • client:顾名思义,这个里面就是RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面
  • common:这里放的是一些公共的代码
  • dev:这里放的是开发相关的一些信息
  • distribution:这里放的就是用来部署RocketMQ的一些东西,比如bin目录 ,conf目录,等等
  • example:这里放的是RocketMQ的一些例子
  • filter:这里放的是RocketMQ的一些过滤器的东西
  • logappender和logging:这里放的是RocketMQ的日志打印相关的东西
  • namesvr:这里放的就是NameServer的源码
  • openmessaging:这是开放消息标准,这个可以先忽略
  • remoting:这个很重要,这里放的是RocketMQ的远程网络通信模块的代码,基于netty实现
  • srvutil:这里放的是一些工具类
  • store:这个也很重要,这里放的是消息在Broker上进行存储相关的一些源码
  • style、test、tools:这里放的是checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类

2、源码阅读方法

        应该是尝试在Intellij IDEA中去启动RocketMQ,然后你就可以在源码中打一些断点,去观察RocketMQ源码的运行过程,而且在这个过程中,还需要从RocketMQ实际运行和使用的角度,去观察他的源码运行的流程。

        什么意思呢?

  • 比如RocketMQ的使用的时候,刚开始肯定是先启动NameServer,在NameServer的源码中打入断点,然后在Intellij IDEA中启动NameServer,接着观察他启动时候的源码运行流程?
  • 接着下一步肯定是启动Broker,在Broker源码中打入断点,然后在Intellij IDEA中启动Broker,去观察他启动时候的源码运行流程?
  • 包括我们的客户端发送消息到BrokerBroker的主从同步,实际上我们都可以用这种方式在源码里打断点,然后在Intellij IDEA中启动和运行RocketMQ,来观察各种场景下的源码运行流程。
            所以我们首先肯定要先能在Intellij IDEA中启动和调试RocketMQ的源码,接着才能进一步继续去分析他的源码运行流程。

3、对NameServer启动类进行配置

首先,我们需要在Intellij IDEA中启动NameServer,所以我们先在RocketMQ源码目录中找到namesvr这个工程,然后展开他的目录,找到NamesvrStartup.java这个类,我们看下面的图示。

启动NameServer

  • 上面的东西都搞定之后,接着就可以右击NamesvrStartup类,选择Debug NamesvrStartup.main()了,就可以用debug模式去启动
  • NameServer了,他会自动找到ROCKETMQ_HOME环境变量,这个目录就是你的运行目录,里面有conf、logs、store几个目录。
  • 他会读取conf里的配置文件,所有的日志都会打印在logs目录里,然后数据都会写在store目录里,启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示。
            Connected to the target VM, address: '127.0.0.1:54473', transport: 'socket'

The Name Server boot success. serializeType=JSON

4、对broker启动类进行配置

                找到broker模块,然后展开他的目录,就可以找到一个BrokerStartup类,这个类是用来启动Broker 进程的。

使用debug模式启动Broker
        接着我们就可以使用debug模式启动BrokerStartup类了,右击他点击Debug BrokerStartup.main(),就可以启动他。

        接着我们会看到如下的一段提示,就说明broker启动成功了:
Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'

The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON

5、使用RocketMQ自带的例子程序测试消息的发送和消费

在example模块下面就有,我们看下面的图。
        在图中我们可以在quickstart包下找到Producer和Consumer两个例子类,而且大家可以看到,在这里包括事务消息、顺序消息,等等,很多高阶功能的例子在这里都有。

5、RocketMQ的大优势:可视化的管理界面

整个RocketMQ集群的元数据都集中在了NameServer里,包括有多少Broker,有哪些Topic,有哪些 Producer,有哪些Consumer,目前集群里有多少消息,等等。
        所以如果我们能想办法跑到NameServer里去,自然就可以知道很多东西 但是那不行,因为NameServer并没有对我们打开一扇门让我们进去知道这些东西。 但是RocketMQ里既然有大量的信息可以让我们进行监控和查看,他自然会提供一些办法来让我们看到,这就是他最大的优势之一,一 个可视化的管理界面。

这个可视化的工作台可以说是非常强大的,他几乎满足了我们大部分对RocketMQ集群监控的需求,我们一步一步看看他都有哪些功能。

使用RocketMQ-dashboard项目。

1、界面

首先刚进入界面,会看到类似下面的东西:

2、集群界面

        在这个界面里可以让你看到Broker的大体消息负载,还有各个Topic的消息负载,另外还可以选择日期要看哪一天的监控数据,都可以看到。

        接着大家点击上边导航栏里的“集群”,就会进入集群的一个监控界面。

        在这个图里可以看到非常有用的一些信息,你可以看到各个Broker的分组,哪些是Master,哪些是Slave,他们各自的机器地址和端口号,还有版本号。包括最重要的,就是他们每台机器的生产消息TPS和消费消息TPS,还有消息总数。这是非常重要的,通过这个TPS统计,就是每秒写入或者被消费的消息数量,就可以看出RocketMQ集群的TPS和并发访问量。

3、Topic管理

        点击上边导航栏的“主题”,可以看到下面的界面,通过这个界面就可以对Topic进行管理了,比如你可以在这里创建、删除和管理 Topic,查看Topic的一些装填、配置,等等,可以对Topic做各种管理。

4、消费者/生产者

        接着点击上边导航栏里的“消费者”和“生产者”,就可以看到访问MQ集群的消费者和生产者了,还可以做对应的一些管理。

5、消息/消息轨迹

着点击导航栏里的“消息”和“消息轨迹”,又可以对消息进行查询和管理。

大体上这个工作台的监控和管理功能就是这些了,所以大家可以在这里看到,我们这个工作台,就可以对集群整体的消息数量以及消息 TPS,还有各个Broker的消息数量和消息TPS进行监控。 同时我们还可以对Broker、Topic、消费者、生产者、消息这些东西进行对应的查询和管理,非常的便捷。

新版本RocketMQ配置Web管理界面:解决找不到rocketmq-console目录问题_沮丧的南瓜-CSDN博客

6、使用RocketMQ自带的例子程序测试消息的发送和消费

1、在example模块下面就有,我们看下面的图。
        在图中我们可以在quickstart包下找到Producer和Consumer两个例子类,而且大家可以看到在这里包括事务消息、顺序消息,等等,很多高阶功能的例子在这里都有。

2、首先我们需要启动rocketmq-console工程,接着我们就进入Topic菜单,新建一个名称为TopicTest的Topic即可,新建完之后在Topic列表就可以看到下面的内容了。

3**、修改和运行RocketMQ自带的Producer示例程序**

        接着我们执行运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,可以看到下面的内容,就 说明我们已经成功的把消息发送到Broker里去了。SendResult [ sendStatus=SEND_OK, msgId=240882076C741D108933A7A47083FBA958FA18B4AAC284718C870000, offsetMsgId=C0A8010200002A9F000000000002ED92, messageQueue=MessageQueue [ topic=TopicTest, brokerName=broker-a, queueId=12], queueOffset=63]

 

4、 修改和运行RocketMQ自带的Consume示例程序

接着修改RocketMQ自带的Consumer示例程序:

        接着运行上述程序,可以看到消费到了1条消息,如下所示:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=12, storeSize=214, queueOffset=63, sysFlag=0, bornTimestamp=1635239634058, bornHost=/192.168.1.2:62904, storeTimestamp=1635239634332, storeHost=/192.168.1.2:10911, msgId=C0A8010200002A9F000000000002ED92, commitLogOffset=191890, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=64, CONSUME_START_TIME=1635239680059, UNIQ_KEY=240882076C741D108933A7A47083FBA958FA18B4AAC284718C870000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
 

我们的RocketMQ的源码环境彻底搭建完毕了,而且可以在本地启动以及收发消息,其实到这里为止,我们就可以去调试和分析RocketMQ的源码了。

7、源码分析的起点:从NameServer的启动脚本开始讲起

会用场景来驱动源码的分析,也就是说,RocketMQ使用的时候。

  • 第一个步骤一定是先启动NameServer,那么我们就先来分析NameServer启动这块的源码。
  • 第二个步骤一定是启动Broker,那么我们再来分析Broker启动的流程。
  • 接着Broker启动之后,必然会把自己注册到NameServer上去,那我们接着分析Broker注册到NameServer这部分源码。
  • 然后 Broker必然会跟NameServer保持一个心跳,那我们继续分析Broker的心跳的源码。

7.1**、从NameServer的启动开始说起**

那就是RocketMQ要玩儿起来的话,必须是先启动他的NameServer,因为后续Broker启动的时候,都是要向NameServer注册的,然后Producer发送消息的时候,需要从NameServer获取Broker机器信息,才能发送消息到Broker去。

  • 所以我们第一个源码分析的场景,就是NameServer启动这个场景。
  • 那么NameServer启动的时候,是通过哪个脚本来启动的呢?

实际是基于rocketmq-master源码中的distribution/bin目录中的mqnamesrv这个脚本来启动

mqnamesrv脚本文件如下:最重要的是最后一行:

sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

大家都看到,上面那行命令中用sh命令执行了runserver.sh这个脚本,然后通过这个脚本去启动了NamesrvStartup这个Java类,那么runserver.sh这个脚本中最为关键的启动NamesrvStartup类的命令是什么呢,如下:

#!/bin/sh

if [ -z "$ROCKETMQ_HOME" ] ; then
  ## resolve links - $0 may be a link to maven's home
  PRG="$0"

  # need this for relative symlinks
  while [ -h "$PRG" ] ; do
    ls=`ls -ld "$PRG"`
    link=`expr "$ls" : '.*-> \(.*\)$'`
    if expr "$link" : '/.*' > /dev/null; then
      PRG="$link"
    else
      PRG="`dirname "$PRG"`/$link"
    fi
  done

  saveddir=`pwd`

  ROCKETMQ_HOME=`dirname "$PRG"`/..

  # make it fully qualified
  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`

  cd "$saveddir"
fi

export ROCKETMQ_HOME

sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

#!/bin/sh

error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

DIR_SIZE_IN_MB=600

choose_gc_log_directory()
{
    case "`uname`" in
        Darwin)
            if [ ! -d "/Volumes/RAMDisk" ]; then
                # create ram disk on Darwin systems as gc-log directory
                DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null
                diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
                echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."
            fi
            GC_LOG_DIR="/Volumes/RAMDisk"
        ;;
        *)
            # check if /dev/shm exists on other systems
            if [ -d "/dev/shm" ]; then
                GC_LOG_DIR="/dev/shm"
            else
                GC_LOG_DIR=${BASE_DIR}
            fi
        ;;
    esac
}

choose_gc_options()
{
    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
    # '1' means releases befor Java 9
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}

choose_gc_log_directory
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

大家可以看到,说白了,上述命令大致简化一下就是类似这样的一行命令:

java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup

这行命令只要是学习过Java基础的人应该都能理解。
通过java命令 + 一个有main()方法的类,就是会启动一个JVM进程,通过这个JVM进程来执行NamesrvStartup类中的main() 方法,这个main()方法里就完成了NameServer启动的所有流程和工作,那么既然NameServer是一个JVM进程,肯定可以设 置JVM参数了,所以上面你看到的一大堆-Xms4g之类的东西,都是JVM的参数。

启动过程总结:
        使用mqnamesrv脚本启动NameServer的时候,本质就是基于java命令启动了一个JVM进程,执行 NamesrvStartup类中的main()方法,完成NameServer启动的全部流程和逻辑,同时启动NameServer这个JVM进程的时 候,有一大堆的默认JVM参数,你当然可以在这里修改这些JVM参数,甚至进行优化。 

注意:自己仔细分析一下mqnamesrv脚本中的每一行逻辑,以及 runserver.sh中的每一行shell脚本代码的逻辑,透彻的理解一个中间件系统的进程是如何启动起来的。

7.2、初步看一眼NamesrvStartup的main()方法

8、NameServer在启动的时候都会解析哪些配置信息?

1、猜猜NamesrvController到底是个什么东西?

NamesrvController controller = createNamesrvController(args);

        这行代码很明显,就是在创建一个NamesrvController类,这个类似乎是**NameServer中的一个核心组件。**我们可以大胆的推测一下

  • NameServer启动之后,是不是需要接受Broker的请求?因为Broker都要把自己注册到NameServer上去。
  • 然后Producer这些客户端是不是也要从NameServer拉取元数据?因为他们需要知道一个Topic的MessageQueue都在哪些Broker上。
    所以我们完全可以猜想一下,NamesrvController这个组件,很可能就是NameServer中专门用来接受Broker和客户端的网络请求的一个组件!因为平时我们写Java Web系统的时候,大家都喜欢用Spring MVC框架,在Spring MVC框架中,用于接受HTTP请求的,就是Controlller组件!

NamesrvController组件,实际上就是NameServer中的核心组件,用来负责接受网络请求的!

2、NamesrvController是如何被创建出来的?

 

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

1、命令行相关

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}

因为很明显上面的代码并不存在什么核心逻辑,你从他的代码的字面意思就可以大致猜测出来,他里面包含了很多 CommandLine相关的字眼,那么顾名思义,这就是一段跟命令行参数相关的代码! 你其实大致推测一下都知道,我们在启动NameServer的时候,是使用mqnamesrv命令来启动的,启动的时候可能会 在命令行里给他带入一些参数,所以很可能就是在这个地方,上面那块代码,就是解析一下我们传递进去的一些命令行参数而已!

2、配置组件

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);

        这里 会看到他创建了NamesrvConfig和NettyServerConfig两个关键的配置类!

从他的类名,我们就可以推测出来,NamesrvConfig包含的是NameServer自身运行的一些配置参数,NettyServerConfig包含的是用于接收网络请求的Netty服务器的配置参数。
        在这里也能明确感觉到,NameServer对外接收Broker和客户端的网络请求的时候,底层应该是基于Netty实现的网络服务器!

NameServer他默认固定的监听请求的端口号就是9876,因为他直接在代码里写死了这个端口号了,所以NettyServer应该就是监听了9876这个端口号,来接收Broker和客户端的请求的!

基于Netty实现的服务器用于接收网络请求:

3、看看NameServer的两个核心配置类里都包含了什么?

上面的NettyServerConfig一看就很明确了,那里的参数就是用来配置NettyServer的,配置好NettyServer之后,就可以监听9876端口号,然后Broker和客户端有请求过来,他就可以处理了。
 

4、NameServer的核心配置到底是如何进行解析的?

比如说你在启动NameServer的时候,用-c选项带上了一个配置文件的地址,然后此时他启动的时候,运行到上面的代码,就会把你配置文件里的配置,放入两个核心配置类里去。

比如你有一个配置文件是:nameserver.properties,里面有一个配置是serverWorkerThreads=16,那么上面的代码 就会读取出来这个配置,然后覆盖到NettyServerConfig里去!


        

NameServer启动的时候后,刚开始就是在初始化和解析 NameServerConfig、NettyServerConfig相关的配置信息,但是一般情况下,我们其实不会特意设置什么配置,所以 他这里一般都是用默认配置的!

 5、跟NameServer启动日志配合起来看

 

其实我们知道NameServer刚启动就会初始化和解析一些核心配置信息,尤其是NettyServer的一些网络配置信息,然后初始化完毕配置信息之后,他就会打印这些配置信息,我们此时可以看一下之前讲解源码环境搭建的时候,不是指定了NameServer的启动日志么?
实际上翻看一下NameServer的启动日志,会看到如下的内容: 

2020-02-05 15:10:05 INFO main - rocketmqHome=rocketmq-nameserver
2020-02-05 15:10:05 INFO main - kvConfigPath=namesrv/kvConfig.json

2020-02-05 15:10:05 INFO main - configStorePath=namesrv/namesrv.properties
2020-02-05 15:10:05 INFO main - productEnvName=center

2020-02-05 15:10:05 INFO main - clusterTest=false
2020-02-05 15:10:05 INFO main - orderMessageEnable=false

2020-02-05 15:10:05 INFO main - listenPort=9876
2020-02-05 15:10:05 INFO main - serverWorkerThreads=8

2020-02-05 15:10:05 INFO main - serverCallbackExecutorThreads=0
2020-02-05 15:10:05 INFO main - serverSelectorThreads=3

2020-02-05 15:10:05 INFO main - serverOnewaySemaphoreValue=256
2020-02-05 15:10:05 INFO main - serverAsyncSemaphoreValue=64

2020-02-05 15:10:05 INFO main - serverChannelMaxIdleTimeSeconds=120
2020-02-05 15:10:05 INFO main - serverSocketSndBufSize=65535

2020-02-05 15:10:05 INFO main - serverSocketRcvBufSize=65535
2020-02-05 15:10:05 INFO main - serverPooledByteBufAllocatorEnable=true

2020-02-05 15:10:05 INFO main - useEpollNativeSelector=false

6**、完成NamesrvController组件的创建**

         直接构造了NamesrvController这个组件,同时传递了NamesrvConfig和NettyServerConfig两

个核心配置类给他。

我们可以看到箭头的指向,两个核心配置类在初始化完毕之后,都是交给了NamesrvController这个核心的组件的

9、NameServer是如何初始化基于Netty的网络通信架构的?

        NamesrvController被创建之后,我们最关心的其实就是他里面的Netty服务器得启动,这样

NameServer才能在默认的9876这个端口上接收Broker和客户端的网络请求,比如Broker注册自己,客户端拉取 Broker路由数据,等等。

1、NamesrvController是如何被启动的?

start(controller)这个代码,他就是启动了NamesrvController这个核心的组 件!

2、NamesrvController在启动时会干什么?

public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
   ...
}

        最为关键的一行代码就是boolean initResult = controller.initialize()这个地方,他其实就是对
NamesrvController执行了initialize初始化的操作。 既然是初始化,那么我们可以大胆的推测下,NamesrvController里我们最为关注的,不就是Netty服务器么,那么这个初始化的地方,是不是就是把他内部的Netty服务器给初始化构造出来了呢?

 

3、Netty服务器是如何初始化的?

public boolean initialize() {
    this.kvConfigManager.load();
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        ...
}

**kvConfigManager.load():**我们大致推测可能就是在里面有一些kv配置数据,是这个组件管理的,然后这里可能就是从磁盘上加载了kv配置。
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService)

 

构造了一个NettyRemotingServer,也就是Netty网络服务器。
在NamesrvController组件被构造好之后,接着进行初始化的时候,首先就是把核心的NettyRemotingServer网络服务器组件给构造了出来。

4、NettyRemotingServer是如何初始化的?

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(),   nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    ...

}

这个ServerBootstrap,就是Netty里的一个核心的类,他就是代表了一个Netty网络服务器,通过这个东西,最终可以让Netty监听一个端口号上的网络请求
        NettyRemotingServer是一个RocketMQ自己开发的网络服务器组件,但是其实底层就是基于Netty的原始API实现的一个ServerBootstrap,是用作真正的网络服务器的。

10、NameServer最终是如何启动Netty网络通信服务器的?

 

阅读源码的一个小技巧:

就是在阅读源码的时候,有些源码是要细看的,但是有些源码你可以大致猜测一下他的作用,就直接略过去了,抓住真正的重点去看!

相关文章

微信公众号

最新文章

更多