RocketMQ高手之路系列之十:RocketMQ网络通信原理分析(一)

x33g5p2x  于2021-12-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(298)

引言

我们都知道RocketMQ是高性能的消息中间件,其高性能不仅体现在其优秀的消息吞吐量上,也体现在其基于Netty实现的高性能通信能力上。接下来将通过几篇文章来阐述RocketMQ的通信模块。通过RocketMQ对于通信模块的设计分析,我们在日后需要设计中间件关于通信模块时,其实也可以参考以及借鉴已经成熟的中间件的设计,同时结合自身业务进行改进。

  • 通信架构说明
  • 以NameServer启动为例
  • 消息编解码
  • 总结

一、通信架构说明

RocketMQ的网络通信模块主要实现在remoting模块中,从模块中我们可以得知RocketMQ使用netty进行底层的通信实现,同时在protocol中自定义了通信协议。

最主要的类关系如下所示:

(1)RemotingService接口

RemotingService 作为顶层接口定义了三个主要的方法,主要包括启动netty服务、关闭netty服务以及注册RPC钩子处理请求前后的逻辑。

public interface RemotingService {
	//开启服务
    void start();

	//停止服务
    void shutdown();
	
	//注册RPC钩子
    void registerRPCHook(RPCHook rpcHook);
}

(2)RPCHook 接口

其中RPCHook 接口定义了请求前后进行的逻辑处理,

public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

(3)服务端与客户端接口

RemotingServerRemotingClient 接口分别继承了RemotingService 接口,并进行了自己的业务扩展。
RemotingServer 接口

public interface RemotingServer extends RemotingService {

	//注册处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

	//单向发送消息,只管发送消息,不管消息发送的结果
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

二、消息编解码

(1)通信协议设计

(图片来自于网络)

(2)编码
remoting模块对于消息进行了自定义协议,将发送的消息以及收到的消息封装为RemotingCommand对象。

public ByteBuffer encode() {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }

(3)解码

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
 		// 获取byteBuffer的总长度
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);
		// 保存header data
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            // 获取消息体的数据
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

三、以NameServer启动为例

在了解remoting模块的核心接口之后,我们接下来看下具体的实现过程。其实在如NameServer启动过程中,它本身就会作为一个Netty的服务端进行启动。我们这里先忽略掉NameServer启动过程中的其他的配置操作,着重对Netty作为服务端启动的流程。大致的启动流程如下所示:

NameServer实际作为Netty服务端启动底层网络连接的,我们都知道它的作用是作为服务端提供给Broker进行注册以及客户端向其拉取路由信息。
NameServer启动过程中实际是创建了NettyRemotingServer,而NettyRemotingServer是RocketMQ自己开发的网络连接组件,当然它的底层实际是基于Netty的接口实现的ServerBootstrap。下列是start的方法,同样我们只关注Netty服务器的启动。

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
		//初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
		//通过Runtime类注册了一个JVM关闭时的shutdown的钩子
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }

其中初始化的方法如下所示:

public boolean initialize() {

		//加载配置
        this.kvConfigManager.load();
		//构建Netty服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
		//Netty的分作线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
		//将工作线程池分配给Netty服务器
        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

初始化完成之后进行启动,我们可以看到实际启动的是NettyRemotingServer。

public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

NettyRemotingServer启动过程如下代码所示:

@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
		//配置启动Netty服务器
        ServerBootstrap childHandler =
        	//各种网络配置
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                //设置网络请求处理器,当Netty服务器收到网络请求后,就会有这些Handler进行处理
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                            	//编解码
                                new NettyEncoder(),
                                new NettyDecoder(),
                                //空闲连接管理
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //网络连接管理
                                new NettyConnectManageHandler(),
                                //网络请求处理
                                new NettyServerHandler()
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
        	//启动Netty服务器,绑定对应的端口号
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

四、总结

通过以上分析可知,RocketMQ实际是在原生Netty之上进行了自己的封装。最后一张图来说明NameServer启动过程中关于Netty启动的部分。在后续的文章中我们再着重分析RocketMQ如何高效使用Netty框架。

相关文章