应该是尝试在Intellij IDEA中去启动RocketMQ,然后你就可以在源码中打一些断点,去观察RocketMQ源码的运行过程,而且在这个过程中,还需要从RocketMQ实际运行和使用的角度,去观察他的源码运行的流程。
什么意思呢?
首先,我们需要在Intellij IDEA中启动NameServer,所以我们先在RocketMQ源码目录中找到namesvr这个工程,然后展开他的目录,找到NamesvrStartup.java这个类,我们看下面的图示。
启动NameServer
The Name Server boot success. serializeType=JSON
找到broker模块,然后展开他的目录,就可以找到一个BrokerStartup类,这个类是用来启动Broker 进程的。
使用debug模式启动Broker
接着我们就可以使用debug模式启动BrokerStartup类了,右击他点击Debug BrokerStartup.main(),就可以启动他。
接着我们会看到如下的一段提示,就说明broker启动成功了:
Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'
The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON
在example模块下面就有,我们看下面的图。
在图中我们可以在quickstart包下找到Producer和Consumer两个例子类,而且大家可以看到,在这里包括事务消息、顺序消息,等等,很多高阶功能的例子在这里都有。
整个RocketMQ集群的元数据都集中在了NameServer里,包括有多少Broker,有哪些Topic,有哪些 Producer,有哪些Consumer,目前集群里有多少消息,等等。
所以如果我们能想办法跑到NameServer里去,自然就可以知道很多东西 但是那不行,因为NameServer并没有对我们打开一扇门让我们进去知道这些东西。 但是RocketMQ里既然有大量的信息可以让我们进行监控和查看,他自然会提供一些办法来让我们看到,这就是他最大的优势之一,一 个可视化的管理界面。
这个可视化的工作台可以说是非常强大的,他几乎满足了我们大部分对RocketMQ集群监控的需求,我们一步一步看看他都有哪些功能。
使用RocketMQ-dashboard项目。
首先刚进入界面,会看到类似下面的东西:
在这个界面里可以让你看到Broker的大体消息负载,还有各个Topic的消息负载,另外还可以选择日期要看哪一天的监控数据,都可以看到。
接着大家点击上边导航栏里的“集群”,就会进入集群的一个监控界面。
在这个图里可以看到非常有用的一些信息,你可以看到各个Broker的分组,哪些是Master,哪些是Slave,他们各自的机器地址和端口号,还有版本号。包括最重要的,就是他们每台机器的生产消息TPS和消费消息TPS,还有消息总数。这是非常重要的,通过这个TPS统计,就是每秒写入或者被消费的消息数量,就可以看出RocketMQ集群的TPS和并发访问量。
点击上边导航栏的“主题”,可以看到下面的界面,通过这个界面就可以对Topic进行管理了,比如你可以在这里创建、删除和管理 Topic,查看Topic的一些装填、配置,等等,可以对Topic做各种管理。
接着点击上边导航栏里的“消费者”和“生产者”,就可以看到访问MQ集群的消费者和生产者了,还可以做对应的一些管理。
着点击导航栏里的“消息”和“消息轨迹”,又可以对消息进行查询和管理。
大体上这个工作台的监控和管理功能就是这些了,所以大家可以在这里看到,我们这个工作台,就可以对集群整体的消息数量以及消息 TPS,还有各个Broker的消息数量和消息TPS进行监控。 同时我们还可以对Broker、Topic、消费者、生产者、消息这些东西进行对应的查询和管理,非常的便捷。
新版本RocketMQ配置Web管理界面:解决找不到rocketmq-console目录问题_沮丧的南瓜-CSDN博客
1、在example模块下面就有,我们看下面的图。
在图中我们可以在quickstart包下找到Producer和Consumer两个例子类,而且大家可以看到在这里包括事务消息、顺序消息,等等,很多高阶功能的例子在这里都有。
2、首先我们需要启动rocketmq-console工程,接着我们就进入Topic菜单,新建一个名称为TopicTest的Topic即可,新建完之后在Topic列表就可以看到下面的内容了。
3**、修改和运行RocketMQ自带的Producer示例程序**
接着我们执行运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,可以看到下面的内容,就 说明我们已经成功的把消息发送到Broker里去了。SendResult [ sendStatus=SEND_OK, msgId=240882076C741D108933A7A47083FBA958FA18B4AAC284718C870000, offsetMsgId=C0A8010200002A9F000000000002ED92, messageQueue=MessageQueue [ topic=TopicTest, brokerName=broker-a, queueId=12], queueOffset=63]
4、 修改和运行RocketMQ自带的Consume示例程序
接着修改RocketMQ自带的Consumer示例程序:
接着运行上述程序,可以看到消费到了1条消息,如下所示:
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=12, storeSize=214, queueOffset=63, sysFlag=0, bornTimestamp=1635239634058, bornHost=/192.168.1.2:62904, storeTimestamp=1635239634332, storeHost=/192.168.1.2:10911, msgId=C0A8010200002A9F000000000002ED92, commitLogOffset=191890, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=64, CONSUME_START_TIME=1635239680059, UNIQ_KEY=240882076C741D108933A7A47083FBA958FA18B4AAC284718C870000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
我们的RocketMQ的源码环境彻底搭建完毕了,而且可以在本地启动以及收发消息,其实到这里为止,我们就可以去调试和分析RocketMQ的源码了。
会用场景来驱动源码的分析,也就是说,RocketMQ使用的时候。
那就是RocketMQ要玩儿起来的话,必须是先启动他的NameServer,因为后续Broker启动的时候,都是要向NameServer注册的,然后Producer发送消息的时候,需要从NameServer获取Broker机器信息,才能发送消息到Broker去。
实际是基于rocketmq-master源码中的distribution/bin目录中的mqnamesrv这个脚本来启动
mqnamesrv脚本文件如下:最重要的是最后一行:
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
大家都看到,上面那行命令中用sh命令执行了runserver.sh这个脚本,然后通过这个脚本去启动了NamesrvStartup这个Java类,那么runserver.sh这个脚本中最为关键的启动NamesrvStartup类的命令是什么呢,如下:
#!/bin/sh
if [ -z "$ROCKETMQ_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
ROCKETMQ_HOME=`dirname "$PRG"`/..
# make it fully qualified
ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
cd "$saveddir"
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
#!/bin/sh
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
DIR_SIZE_IN_MB=600
choose_gc_log_directory()
{
case "`uname`" in
Darwin)
if [ ! -d "/Volumes/RAMDisk" ]; then
# create ram disk on Darwin systems as gc-log directory
DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null
diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."
fi
GC_LOG_DIR="/Volumes/RAMDisk"
;;
*)
# check if /dev/shm exists on other systems
if [ -d "/dev/shm" ]; then
GC_LOG_DIR="/dev/shm"
else
GC_LOG_DIR=${BASE_DIR}
fi
;;
esac
}
choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
choose_gc_log_directory
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@
大家可以看到,说白了,上述命令大致简化一下就是类似这样的一行命令:
java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup
这行命令只要是学习过Java基础的人应该都能理解。
通过java命令 + 一个有main()方法的类,就是会启动一个JVM进程,通过这个JVM进程来执行NamesrvStartup类中的main() 方法,这个main()方法里就完成了NameServer启动的所有流程和工作,那么既然NameServer是一个JVM进程,肯定可以设 置JVM参数了,所以上面你看到的一大堆-Xms4g之类的东西,都是JVM的参数。
启动过程总结:
使用mqnamesrv脚本启动NameServer的时候,本质就是基于java命令启动了一个JVM进程,执行 NamesrvStartup类中的main()方法,完成NameServer启动的全部流程和逻辑,同时启动NameServer这个JVM进程的时 候,有一大堆的默认JVM参数,你当然可以在这里修改这些JVM参数,甚至进行优化。
注意:自己仔细分析一下mqnamesrv脚本中的每一行逻辑,以及 runserver.sh中的每一行shell脚本代码的逻辑,透彻的理解一个中间件系统的进程是如何启动起来的。
NamesrvController controller = createNamesrvController(args);
这行代码很明显,就是在创建一个NamesrvController类,这个类似乎是**NameServer中的一个核心组件。**我们可以大胆的推测一下
NamesrvController组件,实际上就是NameServer中的核心组件,用来负责接受网络请求的!
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
因为很明显上面的代码并不存在什么核心逻辑,你从他的代码的字面意思就可以大致猜测出来,他里面包含了很多 CommandLine相关的字眼,那么顾名思义,这就是一段跟命令行参数相关的代码! 你其实大致推测一下都知道,我们在启动NameServer的时候,是使用mqnamesrv命令来启动的,启动的时候可能会 在命令行里给他带入一些参数,所以很可能就是在这个地方,上面那块代码,就是解析一下我们传递进去的一些命令行参数而已!
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
这里 会看到他创建了NamesrvConfig和NettyServerConfig两个关键的配置类!
从他的类名,我们就可以推测出来,NamesrvConfig包含的是NameServer自身运行的一些配置参数,NettyServerConfig包含的是用于接收网络请求的Netty服务器的配置参数。
在这里也能明确感觉到,NameServer对外接收Broker和客户端的网络请求的时候,底层应该是基于Netty实现的网络服务器!
NameServer他默认固定的监听请求的端口号就是9876,因为他直接在代码里写死了这个端口号了,所以NettyServer应该就是监听了9876这个端口号,来接收Broker和客户端的请求的!
基于Netty实现的服务器用于接收网络请求:
上面的NettyServerConfig一看就很明确了,那里的参数就是用来配置NettyServer的,配置好NettyServer之后,就可以监听9876端口号,然后Broker和客户端有请求过来,他就可以处理了。
比如说你在启动NameServer的时候,用-c选项带上了一个配置文件的地址,然后此时他启动的时候,运行到上面的代码,就会把你配置文件里的配置,放入两个核心配置类里去。
比如你有一个配置文件是:nameserver.properties,里面有一个配置是serverWorkerThreads=16,那么上面的代码 就会读取出来这个配置,然后覆盖到NettyServerConfig里去!
NameServer启动的时候后,刚开始就是在初始化和解析 NameServerConfig、NettyServerConfig相关的配置信息,但是一般情况下,我们其实不会特意设置什么配置,所以 他这里一般都是用默认配置的!
其实我们知道NameServer刚启动就会初始化和解析一些核心配置信息,尤其是NettyServer的一些网络配置信息,然后初始化完毕配置信息之后,他就会打印这些配置信息,我们此时可以看一下之前讲解源码环境搭建的时候,不是指定了NameServer的启动日志么?
实际上翻看一下NameServer的启动日志,会看到如下的内容:
2020-02-05 15:10:05 INFO main - rocketmqHome=rocketmq-nameserver
2020-02-05 15:10:05 INFO main - kvConfigPath=namesrv/kvConfig.json
2020-02-05 15:10:05 INFO main - configStorePath=namesrv/namesrv.properties
2020-02-05 15:10:05 INFO main - productEnvName=center
2020-02-05 15:10:05 INFO main - clusterTest=false
2020-02-05 15:10:05 INFO main - orderMessageEnable=false
2020-02-05 15:10:05 INFO main - listenPort=9876
2020-02-05 15:10:05 INFO main - serverWorkerThreads=8
2020-02-05 15:10:05 INFO main - serverCallbackExecutorThreads=0
2020-02-05 15:10:05 INFO main - serverSelectorThreads=3
2020-02-05 15:10:05 INFO main - serverOnewaySemaphoreValue=256
2020-02-05 15:10:05 INFO main - serverAsyncSemaphoreValue=64
2020-02-05 15:10:05 INFO main - serverChannelMaxIdleTimeSeconds=120
2020-02-05 15:10:05 INFO main - serverSocketSndBufSize=65535
2020-02-05 15:10:05 INFO main - serverSocketRcvBufSize=65535
2020-02-05 15:10:05 INFO main - serverPooledByteBufAllocatorEnable=true
2020-02-05 15:10:05 INFO main - useEpollNativeSelector=false
直接构造了NamesrvController这个组件,同时传递了NamesrvConfig和NettyServerConfig两
个核心配置类给他。
我们可以看到箭头的指向,两个核心配置类在初始化完毕之后,都是交给了NamesrvController这个核心的组件的
NamesrvController被创建之后,我们最关心的其实就是他里面的Netty服务器得启动,这样
NameServer才能在默认的9876这个端口上接收Broker和客户端的网络请求,比如Broker注册自己,客户端拉取 Broker路由数据,等等。
start(controller)这个代码,他就是启动了NamesrvController这个核心的组 件!
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
...
}
最为关键的一行代码就是boolean initResult = controller.initialize()这个地方,他其实就是对
NamesrvController执行了initialize初始化的操作。 既然是初始化,那么我们可以大胆的推测下,NamesrvController里我们最为关注的,不就是Netty服务器么,那么这个初始化的地方,是不是就是把他内部的Netty服务器给初始化构造出来了呢?
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
...
}
**kvConfigManager.load():**我们大致推测可能就是在里面有一些kv配置数据,是这个组件管理的,然后这里可能就是从磁盘上加载了kv配置。
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService)
构造了一个NettyRemotingServer,也就是Netty网络服务器。
在NamesrvController组件被构造好之后,接着进行初始化的时候,首先就是把核心的NettyRemotingServer网络服务器组件给构造了出来。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
...
}
这个ServerBootstrap,就是Netty里的一个核心的类,他就是代表了一个Netty网络服务器,通过这个东西,最终可以让Netty监听一个端口号上的网络请求。
NettyRemotingServer是一个RocketMQ自己开发的网络服务器组件,但是其实底层就是基于Netty的原始API实现的一个ServerBootstrap,是用作真正的网络服务器的。
阅读源码的一个小技巧:
就是在阅读源码的时候,有些源码是要细看的,但是有些源码你可以大致猜测一下他的作用,就直接略过去了,抓住真正的重点去看!
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/mingyuli/article/details/120973402
内容来源于网络,如有侵权,请联系作者删除!