文章

NioEventLoop——Netty实现多Reactor多线程模式

NioEventLoop——Netty实现多Reactor多线程模式

引言

从开发或者执行流程上,Reactor模式可以被清晰地分成三大步——注册、轮询、分发。

  • 第一步,通道注册到选择器上;
  • 第二步,反应器在选择器里进行IO事件轮询;
  • 第三步,反应器把IO事件分发到处理器。

NettyReactor模型,和经典Reactor模型的元素对应关系如下,

  • Netty中的Channel,对经典的Channel做了封装;
  • NettyHandler,定义了自己Handler的规范;
  • Netty的反应器,NioEventLoop

Desktop View Netty的Reactor模型,和经典Reactor模型的元素对应关系

EventLoop

Netty中的EventLoop类,就是对应于Reactor反应器的Reactor反应器角色。

Desktop View Netty中的Reactor模式示意图

它是一个多线程的多反应器模式,

  • 监听线程,boss线程,可以是多个;
  • IO传输线程,反应器是NioEventLoop,多个EventLoop组成一个组,workerLoopGroup

要搞清楚NioEventLoop,首先需要搞清楚两个重要的关系,

  • NioEventLoopNIO原生Selector的关系
  • Netty ChannelNIO原生Channel的关系

NioEventLoop类型绑定了两大关键元素——ThreadJava NIO Selector

Desktop View NioEventLoop的两大关键元素

先看一下ReactorJava NIO Selector选择器,

Desktop View NioEventLoop类定义

再看下Reactor的线程Thread

Desktop View NioEventLoop的Thread定义在父类SingleThreadEventExecutor中

NioEventLoopThread

  • Thread属性的作用,主要是用于轮询NIO Selector并处理IO事件;
  • 处理其他的非IO任务。

NioEventLoopThread线程什么时候启动呢?
Reactor模式中,线程是轮询用的。所以,Reactor线程的启动,一般在ChannelSelector上注册之前,注册工作也是由Thread来完成的。

EventLoopNetty Channel的关系
一对多的关系,一个EventLoop,可以注册很多不同的Netty Channel

Desktop View EventLoopNetty Channel,一对多

选取NioSocketChannel类,作为Channel通道的代表,进行说明。

Desktop View NioSocketChannel类的层次结构

AbstractNioChannel里,有一个SelectableChannel成员,这个成员是Java JDK原生channel成员,也就是说NettyChannel封装了JDK原生Channel

Desktop View AbstractNioChannel类定义

回到正题,Reactor(对应NettyEventLoop)三步曲

  • 第一步:注册
    • channel通道的就绪事件,注册到选择器Selector
    • 一个Reactor对应一个选择器Selector,一个Reactor拥有一个Selector成员属性。
  • 第二步:轮询
    • 轮询的代码,是Reactor重要的一个组成部分,或者说核心的部分。轮询选择器是否有就绪事件。
  • 第三步:分发
    • 将就绪事件,分发到事件附件的处理器handler中,由handler完成实际的处理。

EventLoop三步曲

第一步:注册

Netty中,注册流程还是比较复杂的,Channel注册选择器Selector调用流程如下,

1
2
3
4
5
6
AbstractBootstrap.initAndRegister
--> MultithreadEventLoopGroup.register
--> SingleThreadEventLoop.register
--> AbstractChannel.AbstractUnsafe.register
--> AbstractChannel.register0
--> AbstractNioChannel.doRegister

注册的入口代码,在启动类AbstractBootstrapinitAndRegister方法中。

第一个步骤,是在引导类里边,AbstractBootstrap.initAndRegister,我们关注的是流程的最后一步,AbstractNioChannel.doRegister,拿到原生的选择器和原生的通道,完成通道的注册工作,

Desktop View AbstractNioChannel.doRegister

解释下上面SocketChannel注册到eventLoopselector上的代码。
前面讲到,AbstractNioChannel通道类有一个本地Java通道成员ch,在AbstractNioChannel的构造函数中,被初始化。
NettyChannel通过javaChannel()方法,取得了Java本地Channel。它返回的是一个Java NIO SocketChannel
然后,将这个SocketChannel注册到与eventLoop关联的selector上了。
通过最后一步,Netty终于将这个SocketChannel注册到与eventLoop关联的selector上了。

再看下经典的选择器注册,调用selectionKey.attach(object),就是在socketChannel注册到选择器上返回选择键后,把Acceptor处理器AcceptorHandler绑定到选择键实例上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Reactor() throws IOException {
    //Reactor初始化
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
    //非阻塞
    serverSocket.configureBlocking(false);

    //分步处理,第一步,接收accept事件
    SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

    // SelectionKey.OP_ACCEPT
    serverSocket.socket().bind(address);
    logger.info("服务端已经开始监听:" + address);

    // attach callback object, AcceptorHandler
    sk.attach(new AcceptorHandler());
}

channel.register的第三个参数,是可以设置selectionKey的附加对象的,和调用selectionKey.attach(object)的效果一样。

1
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT, new AcceptorHandler());

对比下Netty的方法,处理不太一样,这里选择键的附件,不是分两步做,而是放在了第三个参数里边,它把NioSocketChannel实例作为附件放到选择键上,这个附件在后面轮询和分发时会用到。

1
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

doRegister()所传递的第三个参数是this,它就是一个NioSocketChannel的实例。意思是,SocketChannel对象自身,以附加字段的方式添加到了selectionKey中,供事件就绪后使用。

第二步:轮询

Netty的轮询也相对复杂一点。

Q:反应器的线程在哪里?
A:NioEventLoop的父类SingleThreadEventExecutor中,有一个Thread thread属性,存储了一个本地Java线程。

Q:线程在哪里启动呢?
A:SocketChannel注册到eventLoopSelector前。

回顾下,通道注册的流程,最后一步是完成原生通道到原生选择器的注册工作,

倒数第三步,AbstractChannel.AbstractUnsafe.register,完成了线程启动,

Desktop View AbstractChannel.AbstractUnsafe.register注册时

这里的eventLoop.execute()方法调用,就是启动EventLoop线程的入口。

register方法里边,有两种情况,

  • 第一种情况,反应器线程是启动的,并且当前线程正好是反应器线程(inEventLoop),两个条件都满足的情况下,就直接执行注册工作(register0,就是最后一步的注册)。
  • 第二种情况,也就是大多数时候,EventLoop线程往往没有启动,这时就得先启动线程,再注册。

进到任务提交方法,execute方法中调用startThread()启动线程,

Desktop View SingleThreadEventExecutor.execute

线程启动之前,任务的添加,只要不是能立即执行的任务,都需要加入到队列里边,

Desktop View 将通道注册的任务,加到taskQueue

回到线程的启动,EventLoop线程的启动,如果Reactor线程没启动,就在startThread里边,调用了doStartThread方法,启动线程,

Desktop View SingleThreadEventExecutor.startThread

STATE_UPDATERSingleThreadEventExecutor内部维护的一个属性,它的作用是标识当前的thread的状态。在初始的时候,state == ST_NOT_STARTED,因此第一次调用startThread时,就会进入到if条件调用到doStartThread()

这是EventLoop比较关键的一个方法,线程池,每执行一个任务都会创建一个线程,这个线程往往都是一个新线程, 首先,把当前线程作为反应器线程,之后,执行事件轮询的业务方法。

executor是反应器组的成员,类型是ThreadPerTaskExecutor,一个特殊的线程池,每执行一个任务,就创建一个新线程。

Desktop View 线程的创建来源于MultithreadEventExecutorGroup线程工厂

线程的数量,从哪里来的?
在最开始时,装配引导类时,有boss groupworker groupboss group线程数,指定1个连接监听的线程就够了;worker group线程数,事件处理,默认不传就是CPU核数,比如8CPU就是16个线程。
有多少个线程数,就有多少个反应器,newChild就是创建一个EventLoop,对应的就是一个线程,线程创建的时机和EventLoop创建的时机不一样,线程是在EventLoop执行任务的时候创建的(doStartThread),创建完线程之后,EventLoop的核心方法run就跑起来了。

Desktop View 启动服务时,设置bossGroup、workerGroup

NioEventLoop.run()方法,就是IO事件轮询的方法,是一个死循环,比如Selector发生了可读事件,就会被run方法轮询到,run方法还做了异步任务(非IO任务)的执行,register0JavaChannel绑定到Selector)也是由run方法执行的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public final class NioEventLoop extends SingleThreadEventLoop {
    
    // ...
    private volatile int ioRatio = 50;
    // ...
    
    @Override
    protected void run() {
        for (;;) {
            try {
                // select 策略选择
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    // 非阻塞的select策略
                    case SelectStrategy.CONTINUE:
                        continue;
                    // 阻塞的select策略
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    // 不需要select,目前已经有可执行的任务了
                    default:
                }

                // 执行网络IO事件和任务调度
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 处理网络IO事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 处理系统Task和自定义Task
                        runAllTasks();
                    }
                } else {
                    // 根据ioRatio计算非IO最多执行的时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

NioEventLoop事件轮询包含两种策略,非阻塞的select策略、阻塞的select策略。

在这个死循环里边,processSelectedKeys就在分发IO事件,runAllTasks,就是处理任务队列的任务。

NioEventLoop中维护了一个线程,线程启动时会调用NioEventLooprun方法,执行IO任务和非IO任务,

  • ‌IO任务‌:即selectionKeyready的事件,如acceptconnectreadwrite等,由processSelectedKeys方法触发。
  • IO任务‌:添加到taskQueue中的任务,如register0bind0等任务,由runAllTasks方法触发。

两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

第三步:分发

先看一下整个查询的过程,事件分发有个前提,IO事件已经通过选择器查询出来了。

run方法里死循环的第一步,就是事件的查询(把选择器里边的事件查出来),这个查询相较于经典Reactor模式有点复杂,但是原理类似,这里分多种不同策略,比如阻塞式查询、非阻塞查询,接下来就到了processSelectedKeys,做事件分发。

processSelectedKeys处理IO事件,这里有两种情况,第一种是优化过的,第二种是普通的。NettySelector做过优化,Netty会尝试获取权限去优化原生Selector,如果可以,selectedKeys不为null,就会走优化过的处理方式。优化不成功,就还是用原始经典的Selector选择器事件处理方式进行事件处理,这里先不做优化的展开介绍。两种方式主要是遍历selectionKey的方式不同,具体处理事件的调用逻辑是完全一致的。

Desktop View NioEventLoop.processSelectedKeys方法实现

下面介绍经典的选择键的处理,processSelectedKeysPlain,迭代selectedKeys获取就绪的IO事件,为每个事件都调用processSelectedKey来处理它。

Desktop View NioEventLoop.processSelectedKeysPlain方法实现

在前面的channel注册时,将NioSocketChannel以附加字段的方式添加到了selectionKey中。在这里,通过k.attachment()取得这个通道对象,然后就调用processSelectedKey来处理这个IO事件和通道。

处理函数,会处理不同的事件,比如可写、可读,第一个参数是事件选择键,第二个是选择键的附件,比如对通道注册来说,附件就是通道,

Desktop View NioEventLoop.processSelectedKey方法实现

NioEventLoop.processSelectedKey方法中处理了三个事件,

  • OP_READ,可读事件,即Channel中收到了新数据可供上层读取。
  • OP_WRITE,可写事件,即上层可以向Channel写入数据。
  • OP_CONNECT,连接建立事件,即TCP连接已经建立,Channel处于active状态。

通道可读,会调用NioByteUnsafe.read方法,核心是doReadBytes,把通道里边的数据(发送过来的数据)读取出来,放到ByteBuf

Desktop View NioByteUnsafe.read方法实现

读取NioSocketChannelNioSocketChannel.doReadBytes方法从socket revbuf读取数据,但每次读取前都需要记录缓冲区中可写区域的大小,用于判断缓冲区是否读满,继而决定是否继续读取数据。

Desktop View NioSocketChannel.doReadBytes方法实现

bytebuf.writeBytesjavaChannel方法拿到Java NIO的原生通道,把通道的数据读取到Bytebuf里,读取的量是通过公式(可适配的)算出来的,比如按照之前监视的数据推断这次应该从通道里读多少数据。

把数据从通道里边读出来之后,下一步就是把读出来的数据分发出去。把bytebuf通过通道channel的流水线分发出去,分发给谁呢?就是流水线上面的处理器。流水线处理器就会读到bytebufbytebuf就这样一步步进入到业务处理器进行处理。

NioByteUnsafe.read 方法的后半部分,

  • 触发pipeline.fireChannelRead(byteBuf),通过pipeline触发读事件。
  • 判断是否继续读有两个标准,一是不能超过最大的读取次数(默认16次);二是缓冲区的数据每次都要读满,比如分配2KB ByteBuf,则必须读取2KB的数据。

整个事件的分发,就是这样的流程。EventLoop三步曲,就是这些了。

EventLoop无锁化设计

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程(EventLoop只有一个线程),其内部会维护一个selectortaskQueue,负责处理网络IO请求和内部任务,这里的selectortaskQueue是线程内部的。

selectorIO事件处理,以及任务队列的任务处理,都在EventLoop线程串行的执行(内部的任务和事件的查询、处理、分发都是串行的,也就不存在线程安全问题,无锁化设计,性能非常高)。

NioEventLoop任务队列核心原理

任务队列,对应的成员是taskQueue,位置在SingleThreadEventExecutor,这个队列比较特殊,不是普通的队列,在newTaskQueue里边创建的,创建的是Mpsc队列,实现由JCTools工具包提供,

Desktop View NioEventLoop.newTaskQueue方法实现

MpscMulti Producer Single Consumer (Lock less, bounded and unbounded),多生产者单一消费者无锁队列(有界和无界都有实现),来自JCTools工具包。
早在1996年就有论文提出了无锁队列的概念,再到后来Disruptor,高性能已得到生产的验证。Jctools中的高性能队列,其性能丝毫不输于Disruptor

JCToolsJava Concurrency Tools)提供了一系列非阻塞并发数据结构(标准Java中缺失的),当存在线程争抢的时候,非阻塞并发数据结构比阻塞并发数据结构能提供更好的性能JCTools是一个开源工具包,在Apache License 2.0下发布,并在NettyRxjava等诸多框架中被广泛使用。源码参见JCTools的开源Github仓库

Maven中引入JCTools坐标就能使用JCTools了,

1
2
3
4
5
<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>3.0.0</version>
</dependency>

JCTools提供的数据结构,第一大类就是MapJCTools提供的非阻塞Map

  • ConcurrentAutoTable(后面几个map/set结构的基础)
  • NonBlockingHashMapConcurrentHashMap的增强)
  • NonBlockingHashMapLong
  • NonBlockingHashSet
  • NonBlockingIdentityHashMap
  • NonBlockingSetInt

另外,提供了非阻塞队列,Netty用到的是MPSCJCTools提供的非阻塞队列分为4类,可以根据不同的应用场景选择使用,

  • SPSC-单一生产者单一消费者(有界和无界)
  • MPSC-多生产者单一消费者(有界和无界)
  • SPMC-单生产者多消费者(有界)
  • MPMC-多生产者多消费者(有界)

任务队列的内容分为两部分,

  • 任务的提交
  • 任务的处理

任务提交

Task任务的提交,有3种典型使用场景,

  • 用户提交的普通任务
  • 用户提交的定时任务
  • Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。

普通任务提交,使用eventLoopexecute方法,通过ChannelHandlerContext获取channel,通过channel获取eventLoop,然后调用execute方法即可放入到任务队列,代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = ctx.channel();
channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1);
            // dosomething(...)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

例子就是通道注册时,

Desktop View AbstractChannel.AbstractUnsafe.register注册时

定时任务提交,和普通任务类似,还是使用通道,ctx.executor()返回的是EventLoop,说明定时任务的提交,还是用的反应器EventLoop,只不过方法用的是schedule

Netty提供了一些添加定时任务的接口,它就是NioEventLoop的父类AbstractScheduledEventExecutorschedule方法,挑一个来看看(其它重载底层队列都一样),

定时任务也大同小异,都是通过ChannelHandlerContext获取channel,通过channel获取eventLoop,然后调用schedule方法即放入到任务队列,代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
//使用定时器,发送心跳报文
public void heartBeat(ChannelHandlerContext ctx,
                    ProtoMsg.Message heartbeatMsg) {
    ctx.executor().schedule(() -> {
        if (ctx.channel().isActive()) {
            log.info("发送 HEART_BEAT 消息 to server");
            ctx.writeAndFlush(heartbeatMsg);

            //递归调用,发送下一次的心跳
            heartBeat(ctx, heartbeatMsg);
        }
    }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}

schedule第一个参数和普通任务一样,传入一个线程即可,第二个参数是延时时间,第三个参数是延时单位,这里用的是秒。

channel的各种方法,是由Reactor线程执行的,非Reactor线程执行的话,就必须做一个任务,加入到任务队列,由Reactor线程后续调度执行。

1
2
3
4
5
// 非反应器线程的消息发送操作
// 写Protobuf数据帧
public synchronized void writeAndFlush(Object pkg) {
    channel.writeAndFlush(pkg);
}

流程如下,如果A线程要调用EventLoop channel发消息,必须把发消息的操作变成一个任务,加入到EventLoopTaskQueue,再由EventLoop里自己的线程从任务队列里边把任务取出来,再去执行消息的发送操作。

Desktop View 非反应器线程的消息发送操作

当用户线程(业务线程)发起write操作时,Netty会进行判断,如果发现不是NioEventLoop线程(反应器线程),则将发送消息封装成WriteTask,放入NioEventLoop的任务队列,由NioEventLoop线程后续去执行。

看一下过程大概对应的代码,用户线程(业务线程)发起write操作时的入口,

1
io.netty.channel.AbstractChannelHandlerContext.write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

Desktop View AbstractChannelHandlerContext.writeAndFlush方法

Desktop View AbstractChannelHandlerContext.write方法

write里边,做一个判断,如果是Reactor线程自己在做(inEventLoop),直接干活;如果不是,就封装成一个任务,安全执行。整个过程和普通任务提交类似,只不过是定义了一个任务的类型,

Desktop View AbstractChannelHandlerContext.safeExecute方法

这里的executor执行的是Netty自己实现的SingleThreadEventExecutor.execute()方法,这里把任务相关的参数封装了一下,普通的Runnable没有这么多成员。

为了节省内存,创建了一个可回收的对象池,IO通道有大量工作要做,并且很多都是异步的,不同的非Reactor线程发消息,任务实例需要复用。对象池就可以把这些实例回收,不需要每次都新建,可以反复使用,高效利用内存。

Desktop View AbstractChannelHandlerContext.AbstractWriteTask,WriteAndFlushTask的父类

execute,任务提交方法,把传进来的task放到任务队列里边,不同Netty版本实现有些微小差异,

Desktop View SingleThreadEventExecutor.execute

Desktop View SingleThreadEventExecutor将通道注册的任务加到taskQueue队列

至此,异步任务成功加入taskQueue

taskQueuempsc队列,即多生产者单消费者队列,Netty使用mpsc,将外部线程的task聚集起来,在Reactor线程内部用单线程来无锁,串行执行。

任务调度

异步任务的调度执行路径,代码调用路径如下,

1
2
3
io.netty.channel.nio.NioEventLoop#run 
--> io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long) 
--> io.netty.util.concurrent.AbstractEventExecutor#safeExecute

这里safeExecute执行的task,就是前面write写入时包装的AbstractWriteTask,对应实现类路径为io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask.runAbstractWriteTaskrun经过一些系统处理操作,最终会调用io.netty.channel.ChannelOutboundBuffer.addMessage方法,将发送消息加入发送队列(链表)。

之前看过,查询到事件之后,就走到了processSelectedKeys,如果IO处理的比例是100%,就是所有查询出来的IO事件的活都要干,那就处理完所有IO事件,再处理所有任务队列里边的任务。

默认情况下,IO处理的比例是50,首先把时间记录下来,然后处理IO事件,得到处理IO事件的时间,根据比例计算任务处理的事件该多久。两边加起来的比例等于1,上面IO事件处理时间没办法压缩,可以控制的是任务处理时间。

看不加时间限制的runAllTasks方法,从定时任务里边,把到期了的任务取出来,把这些定时任务加到任务队列里边,

Desktop View NioEventLoop.runAllTasks方法实现

定时任务队列scheduledTaskQueue,是一个专门的任务队列,定义在AbstractScheduledEventExecutorEventLoop的基类之一。

Desktop View AbstractScheduledEventExecutor.scheduledTaskQueue

提交的定时任务,是提交到ScheduledTaskQueue里边了,只有它到期了,才把它取出来,加入到TaskQueue,直到把所有的定时任务都取出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //从定时任务队列中抓取第一个定时任务
    //寻找截止时间为nanoTime的任务
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果该定时任务队列不为空,则塞到普通任务队列里面
    while (scheduledTask != null) {
        //如果添加到普通任务队列过程中失败
        if (!taskQueue.offer(scheduledTask)) {
            //则重新添加到定时任务队列中
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //继续从定时任务队列中拉取任务
        //方法执行完成之后,所有符合运行条件的定时任务队列,都添加到了普通任务队列中
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

runAllTasksFrom方法,执行任务队列里边所有的任务,

Desktop View SingleThreadEventExecutor.runAllTasksFrom方法

safeExecute(task)执行任务

Desktop View AbstractEventExecutor.safeExecute方法

举个例子,前面说的消息发送(前面write写入时),封装成了AbstractWriteTask,把要发送的消息、上下文都放到这个task实例里边,AbstractWriteTaskrun只执行最终的通道写出,最终会调用到addMessage,将发送的消息加入到发送列表,最终发送出去。任务的执行都在run方法里边完成。

具体工作和通道的消息发送流程有关,有需要的话,后续做专题介绍。

ChannelConfig通道配置类

首先看,内存分配器怎么作用到通道上面的呢?
通过通道配置选项

Desktop View ChannelConfig通道配置类示例

下面看下通道配置类和通道配置选项。

什么是ChannelConfig
ChannelConfig通道配置类用于管理各种通道的配置选项。每个特定的Channel实现类都有自己对应的ChannelConfig实现类,比如,

  • NioSocketChannel对应的配置类为NioSocketChannelConfig
  • NioServerSocketChannel对应的配置类为NioServerSocketChannelConfig。\

Netty的通道非常多,每种通道都有各自通道配置选项,专门设计了通道配置类来干这个事情。

Desktop View ChannelConfig的部分继承关系

SocketChannel通道配置类为例,

  • 传输通道配置类
  • 监听通道配置类
  • 顶层接口,ChannelConfig
  • 有一个默认实现配置类,DefaultChannelConfig

如何通过Channel接口获取ChannelConfig的实例?
Channel接口中定义了一个方法config(),用于获取特定通道实现的配置,Channel子类需要实现这个接口。

1
2
3
4
5
public interface Channel extends AttributeMap, Comparable<Channel> {
    ...
    ChannelConfig config();
    ...
}

ChannelConfig实例的创建时机
通常Channel实例,在创建的时候,就会创建其对应的ChannelConfig实例。

在构造函数里边,例如,NioServerSocketChannelNioSocketChannel都是在构造方法中创建了其对应的ChannelConfig实现。

1
2
3
4
5
6
7
8
9
10
11
public class NioServerSocketChannel extends AbstractNioMessageChannel
        implements io.netty.channel.socket.ServerSocketChannel {

    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 构造方法中创建NioServerSocketChannelConfig实例
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}
1
2
3
4
5
6
7
8
9
10
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel
// ...
private final SocketChannelConfig config;
// ...
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    // 构造方法中创建SocketChannelConfig实例
    config = new NioSocketChannelConfig(this, socket.socket());
}
// ...

ChannelConfig的配置项ChannelOption

Netty设计一个ChannelOption类,用于封装ChannelConfig配置项支持的所有选项。

Desktop View ChannelOption的部分继承关系

ChannelConfigChannelOption的关系是什么呢?
ChannelConfig类似MapChannelOptionMapkeyChannelConfig定义了相关方法来获取和修改Map中的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ChannelConfig {
    //获取所有参数
    Map<ChannelOption<?>, Object> getOptions();

    //设置所有参数
    boolean setOptions(Map<ChannelOption<?>, ?> options);

    //获取以某个ChannelOption为key的参数值
    <T> T getOption(ChannelOption<T> option);

    //替换某个ChannelOption为key的参数值
    <T> boolean setOption(ChannelOption<T> option, T value);
}

举个例子,ChannelConfig类的选项修改。当我们想修改一个Map中的参数时,使用setOption方法。

例如,希望为NioSocketChannel的内存分配器配置为PooledByteBufAllocator,则可以使用类似以下方式来设置,

1
2
3
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

Netty预定了很多配置项,ChannelConfig支持的通用ChannelOption

  • ChannelOption.CONNECT_TIMEOUT_MILLIS 连接超时设置
  • ChannelOption.WRITE_SPIN_COUNT
  • ChannelOption.AUTO_READ
  • ChannelOption.MAX_MESSAGES_PER_READ
  • ChannelOption.RCVBUF_ALLOCATOR
  • ChannelOption.ALLOCATOR 内存分配器
  • ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
  • ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
  • ChannelOption.MESSAGE_SIZE_ESTIMATOR
  • ChannelOption.AUTO_CLOSE

socket的传输设计有关系,SocketChannelConfigChannelConfig基础上额外支持的ChannelOption

  • ChannelOption.SO_KEEPALIVE
  • ChannelOption.SO_REUSEADDR
  • ChannelOption.SO_LINGER
  • ChannelOption.TCP_NODELAY
  • ChannelOption.SO_RCVBUF
  • ChannelOption.SO_SNDBUF
  • ChannelOption.IP_TOS
  • ChannelOption.ALLOW_HALF_CLOSURE

ChannelOption的对应set/get方法,事实上这里的每一种ChannelOption,除了可以使用setOption方法来进行设置,

1
ch.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

ChannelConfig接口中都为其设置了对应的快捷set/get方法。

1
2
3
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setAllocator(PooledByteBufAllocator.DEFAULT);
本文由作者按照 CC BY 4.0 进行授权