Netty详解

x33g5p2x  于2021-11-23 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(481)

Netty简介

Netty是一个基于Java的高性能的网络框架,基于TCP和UDP,是对Java中NIO的一个封装。

我们可以根据自己的需求快速的开发一款自己需要的应用层协议。

Netty核心组件

EventLoopGroup
EventLoopGroup 是 EventLoop的一个数组,默认为CPU个数*2,也可以自定义。

EventLoop
一个单线程的Executor,用来绑定一个selector,可以同时绑定多个Channel,读取多个Channel的事件,主要有两种任务:
1、查询selector中监视的channel的事件,并执行对应的handler的方法
2、执行用户的自定义任务。 用户通过channel.eventLoop().execute(Runnable runnable)放入的自定义任务,也可以放入定时任务。

这两个任务是是在一个线程中执行的,通过一个ratio变量来控制二者执行的时间片占比

EventLoop中还有一个executor的属性,这个executor用来执行EventLoop的初始化。如果EventLoop还未初始化,executor会新生成一个Thread来启动EventLoop

当调用EventLoop.execute(Runnable runnable)的时候,如果还没有启动对应的线程,就会利用executor来启动

Channel
用来接受和写入对应的数据,对应于之前的socket。

Handler
一个Channel对应着多个Handler,Handler分为InboundHandler和OutboundHandler两种。

主要方法有read()、write()等方法。

当Channel接收数据的时候,InboundHandler会对其进行处理

当Channel向外发送数据的时候,OutBoundHandler会对其进行处理

Pipeline
一个Channel对应着有一个Pipeline,Pipeline是ChannelHandler的一个链表。

当接收数据的时候,Pipeline会从head 到 tail 依次执行对应InboundHanlder的方法

当发送数据的时候,Pipeline会从tail 到 tail 依次执行对应的OutBoundHandler的方法

Netty线程模型

Netty的线程模型也是基于Reactor的,如果不了解Reactor模式的,可以看我之前写的文章Reactor设计模式

Netty工作流程

Netty中分为ServerBootstrap 和 Bootstrap。

1、ServerBootstrap 初始化

ServerBootstrap 是服务器创建的引导类。
其中需要传入两个EventLoopGroup,分别为parentGroup和childGroup。
parentGroup 用来 处理 新channel的连接,childGroup 用来 处理 channel的读取等操作

下面说一下EventLoopEvent的创建流程

如上图所示,executor被初始化成一个ThreadPerTaskExecutor, execute()的时候都会新开一个线程执行对应的runnable。

我们注意下children这个变量,就是对应的EventLoop数组,

对应的newChild(executor,args)就是生成一个EventLoop对象。

EventLoopGroup的核心就是EventLoop数组。

EventLoop对应着一个线程和selector,可以监听多个channel,ServerChannel通过轮询的方式将新连接的channel分配给EventLoop数组中的EventLoop

Channel和EventLoop的绑定

再说下init(Channel )

void init(Channel channel) throws Exception {
   
        ChannelPipeline p = channel.pipeline();

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //这里,加入了对应的新建连接对应的处理handler
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

ServerBootstrapAcceptor 对应的 channelRead()如下

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

			//加入对应的hanlder,也是我们初始化的时候定义的
            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //重点,将channel注册到childGroup中
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

next()就是查找childGroup的下一个EventLoop,也就是轮询的使用EventLoop

然后转到对应的EventLoop的register()方法

到这步熟悉了吧,利用unsafe()将channel和对应的eventLoop的Selector绑定在一起。

EventLoop详解

Netty中常用的就是NioEventLoop,我们看下它的run()方法

protected void run() {
        for (;;) {
            try {
            
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                       
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

这里我只列出了核心代码,可以看出EventLoop开了一个死循环,做了两个事情
1、processSelectedKeys()

由上图所示,processSelectedKeys()主要是用来处理selector监听的channel。

如上图所示,通过unsafe.read()来读取对应的数据

看上图,来到了我们熟悉的pipeline

executor.isEventLoop()是判断当前线程是否是EventLoop绑定的线程,如果是就直接执行invokeChannelRead(m).
如果不是,就将其包装为runnable加入到 taskQueue中。

因为read()只能由EventLoop线程发起,所以会直接执行invokeChannelRead(m).

如果handler对应的处理过程比较长,建议在handler中使用线程池来执行对应的任务,否则会影响其他连接的业务处理

如上图所示,调用了hanlder().channelRead(this,msg),这就是调用了对应的handler。

但是并未传递到后面的handler,当想要将对应的数据传递到后续的handler的时候,需要用户手动的调用

ctx.fireChannelRead(msg);

2、runAllTasks()

在InboundHandler和OutboundHandler的对应的回调方法中,暴露了ChannelHandlerContext,用户可以通过

ctx.channel().eventLoop().execute().execute(Runnable runnable)

来提交对应的任务,对应的任务会加入到EventLoop的taskQueue中。

客户端 Bootstrap启动

客户端的Bootstrap是ServerBootstrap的简化版本,只有一个childGroup(), 也是初始化对应的channel,然后绑定到一个EventLoop中。

相关文章