Netty源码解析e——线程池模型之线程NioEventLoop

x33g5p2x  于2022-02-21 转载在 其他  
字(5.2k)|赞(0)|评价(0)|浏览(255)

上一篇详细分析了NioEventLoopGroup作用和源码,本文来看看NioEventLoop。NioEventLoop源码比NioEventLoopGroup源码复杂得多,每个NioEventLoop对象都与NIO中的多路复用器Selector一样,要管理成千上万条链路,所有链路数据的读/写事件都由它来发起。本文通过对NioEventLoop的功能、底层设计等对其源码进行深度剖析。

继承关系

NioEventLoop的功能

核心功能:

  • 开启Selector并初始化
  • 把ServerSocketChannel注册到Selector上
  • 处理各种I/O事件,如OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE事件
  • 执行定时调度任务
  • 解决JDK空轮询bug

整体功能:

NioEventLoop这些功能的具体实现大部分都是委托其他类来完成的,其本身只完成数据流的接入工作。这种设计减轻了NioEventLoop的负担,同时增强了其扩展性。NioEventLoop的整体功能如上图所示。第二层为NioEventLoop的4个核心方法。对于每条EventLoop线程来说,由于链路注册到Selector上的具体实现都是委托给Unsafe类来完成的,因此register()方法存在于其父类SingleThreadEventLoop中。接下来对一些关键方法一一进行解读。

NioEventLoop开启Selector

当初始化NioEventLoop时,通过openSelector()方法开启Selector。在rebuildSelector()方法中也可调用openSelector()方法。在NIO中开启Selector(1行代码),只需调用Selector.open()或SelectorProvider的openSelector()方法即可。Netty为Selector设置了优化开关,如果开启优化开关,则通过反射加载sun.nio.ch.SelectorImpl对象,并通过已经优化过的SelectedSelectionKeySet替换sun.nio.ch.SelectorImpl对象中的selectedKeys和publicSelectedKeys两个HashSet集合。其中,selectedKeys为就绪Key的集合,拥有所有操作事件准备就绪的选择Key;publicSelectedKeys为外部访问就绪Key的集合代理,由selectedKeys集合包装成不可修改的集合。

SelectedSelectionKeySet具体做了什么优化呢?主要是数据结构改变了,用数组替代了HashSet,重写了add()和iterator()方法,使数组的遍历效率更高。开启优化开关,需要将系统属性io.netty.noKeySetOptimization设置为true。开启Selector的代码解读如下:
查看代码

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //创建selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
     
        //判断是否开始优化开关,默认为不开启,直接返回selector
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        //通过反射创建selectorImpl对象
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        //使用优化后的SelectedSelectionKeySet对象
        //使用JDK的sun.nio.ch.SelectorImpl.selectedKeys替换掉
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    //设置为可写
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    //通过反射的方式把selectorselectedKeys和publicSelectedKeys
                    //使用Netty构造的selectorselectedKeys替换JDK的selectedKeySet
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        //把selectedKeySet赋值给NioEventLoop属性,并返回Selector元数据.
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

NioEventLoop的run()方法

run()方法主要分三部分:select(boolean oldWakenUp),用来轮询就绪的Channel;process SelectedKeys,用来处理轮询到的SelectionKey;runAllTasks,用来执行队列任务。

第一部分,select(boolean oldWakenUp):主要目的是轮询看看是否有准备就绪的Channel。在轮询过程中会调用NIOSelector的selectNow()和select(timeoutMillis)方法。由于对这两个方法的调用进行了很明显的区分,因此调用这两个方法的条件也有所不同,具体逻辑如下。

(1)当定时任务需要触发且之前未轮询过时,会调用selectNow()方法立刻返回。

(2)当定时任务需要触发且之前轮询过(空轮询或阻塞超时轮询)直接返回时,没必要再调用selectNow()方法。

(3)若taskQueue队列中有任务,且从EventLoop线程进入select()方法开始后,一直无其他线程触发唤醒动作,则需要调用selectNow()方法,并立刻返回。因为在运行select(booleanoldWakenUp)之前,若有线程触发了wakeUp动作,则需要保证tsakQueue队列中的任务得到了及时处理,防止等待timeoutMillis超时后处理。

(4)当select(timeoutMillis)阻塞运行时,在以下4种情况下会正常唤醒线程:其他线程执行了wakeUp唤醒动作、检测到就绪Key、遇上空轮询、超时自动醒来。唤醒线程后,除了空轮询会继续轮询,其他正常情况会跳出循环。具体代码解读如下:

相关文章