NioEventLoop——Netty实现多Reactor多线程模式
引言
从开发或者执行流程上,Reactor模式可以被清晰地分成三大步——注册、轮询、分发。
- 第一步,通道注册到选择器上;
- 第二步,反应器在选择器里进行
IO事件轮询; - 第三步,反应器把
IO事件分发到处理器。
Netty的Reactor模型,和经典Reactor模型的元素对应关系如下,
Netty中的Channel,对经典的Channel做了封装;Netty的Handler,定义了自己Handler的规范;Netty的反应器,NioEventLoop。
Netty的Reactor模型,和经典Reactor模型的元素对应关系
EventLoop
Netty中的EventLoop类,就是对应于Reactor反应器的Reactor反应器角色。
它是一个多线程的多反应器模式,
- 监听线程,
boss线程,可以是多个; IO传输线程,反应器是NioEventLoop,多个EventLoop组成一个组,workerLoopGroup。
要搞清楚
NioEventLoop,首先需要搞清楚两个重要的关系,
NioEventLoop与NIO原生Selector的关系Netty Channel与NIO原生Channel的关系
NioEventLoop类型绑定了两大关键元素——Thread、Java NIO Selector,
先看一下Reactor的Java NIO Selector选择器,
再看下Reactor的线程Thread,
NioEventLoop的Thread定义在父类SingleThreadEventExecutor中
NioEventLoop的Thread
Thread属性的作用,主要是用于轮询NIO Selector并处理IO事件;- 处理其他的非
IO任务。
NioEventLoop的Thread线程什么时候启动呢?
在Reactor模式中,线程是轮询用的。所以,Reactor线程的启动,一般在Channel在Selector上注册之前,注册工作也是由Thread来完成的。
EventLoop和Netty Channel的关系
一对多的关系,一个EventLoop,可以注册很多不同的Netty Channel。
选取NioSocketChannel类,作为Channel通道的代表,进行说明。
在AbstractNioChannel里,有一个SelectableChannel成员,这个成员是Java JDK原生channel成员,也就是说Netty的Channel封装了JDK原生Channel。
回到正题,
Reactor(对应Netty中EventLoop)三步曲
- 第一步:注册
- 将
channel通道的就绪事件,注册到选择器Selector。- 一个
Reactor对应一个选择器Selector,一个Reactor拥有一个Selector成员属性。- 第二步:轮询
- 轮询的代码,是
Reactor重要的一个组成部分,或者说核心的部分。轮询选择器是否有就绪事件。- 第三步:分发
- 将就绪事件,分发到事件附件的处理器
handler中,由handler完成实际的处理。
EventLoop三步曲
第一步:注册
在Netty中,注册流程还是比较复杂的,Channel注册选择器Selector调用流程如下,
1
2
3
4
5
6
AbstractBootstrap.initAndRegister
--> MultithreadEventLoopGroup.register
--> SingleThreadEventLoop.register
--> AbstractChannel.AbstractUnsafe.register
--> AbstractChannel.register0
--> AbstractNioChannel.doRegister
注册的入口代码,在启动类AbstractBootstrap的initAndRegister方法中。
第一个步骤,是在引导类里边,AbstractBootstrap.initAndRegister,我们关注的是流程的最后一步,AbstractNioChannel.doRegister,拿到原生的选择器和原生的通道,完成通道的注册工作,
解释下上面
SocketChannel注册到eventLoop的selector上的代码。
前面讲到,AbstractNioChannel通道类有一个本地Java通道成员ch,在AbstractNioChannel的构造函数中,被初始化。
Netty的Channel通过javaChannel()方法,取得了Java本地Channel。它返回的是一个Java NIO SocketChannel。
然后,将这个SocketChannel注册到与eventLoop关联的selector上了。
通过最后一步,Netty终于将这个SocketChannel注册到与eventLoop关联的selector上了。
再看下经典的选择器注册,调用selectionKey.attach(object),就是在socketChannel注册到选择器上返回选择键后,把Acceptor处理器AcceptorHandler绑定到选择键实例上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Reactor() throws IOException {
//Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// SelectionKey.OP_ACCEPT
serverSocket.socket().bind(address);
logger.info("服务端已经开始监听:" + address);
// attach callback object, AcceptorHandler
sk.attach(new AcceptorHandler());
}
channel.register的第三个参数,是可以设置selectionKey的附加对象的,和调用selectionKey.attach(object)的效果一样。
1
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT, new AcceptorHandler());
对比下Netty的方法,处理不太一样,这里选择键的附件,不是分两步做,而是放在了第三个参数里边,它把NioSocketChannel实例作为附件放到选择键上,这个附件在后面轮询和分发时会用到。
1
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
doRegister()所传递的第三个参数是this,它就是一个NioSocketChannel的实例。意思是,SocketChannel对象自身,以附加字段的方式添加到了selectionKey中,供事件就绪后使用。
第二步:轮询
Netty的轮询也相对复杂一点。
Q:反应器的线程在哪里?
A:NioEventLoop的父类SingleThreadEventExecutor中,有一个Thread thread属性,存储了一个本地Java线程。
Q:线程在哪里启动呢?
A:SocketChannel注册到eventLoop的Selector前。
回顾下,通道注册的流程,最后一步是完成原生通道到原生选择器的注册工作,
倒数第三步,AbstractChannel.AbstractUnsafe.register,完成了线程启动,
AbstractChannel.AbstractUnsafe.register注册时
这里的eventLoop.execute()方法调用,就是启动EventLoop线程的入口。
register方法里边,有两种情况,
- 第一种情况,反应器线程是启动的,并且当前线程正好是反应器线程(
inEventLoop),两个条件都满足的情况下,就直接执行注册工作(register0,就是最后一步的注册)。 - 第二种情况,也就是大多数时候,
EventLoop线程往往没有启动,这时就得先启动线程,再注册。
进到任务提交方法,execute方法中调用startThread()启动线程,
SingleThreadEventExecutor.execute
线程启动之前,任务的添加,只要不是能立即执行的任务,都需要加入到队列里边,
回到线程的启动,EventLoop线程的启动,如果Reactor线程没启动,就在startThread里边,调用了doStartThread方法,启动线程,
SingleThreadEventExecutor.startThread
STATE_UPDATER是SingleThreadEventExecutor内部维护的一个属性,它的作用是标识当前的thread的状态。在初始的时候,state == ST_NOT_STARTED,因此第一次调用startThread时,就会进入到if条件调用到doStartThread()。
这是EventLoop比较关键的一个方法,线程池,每执行一个任务都会创建一个线程,这个线程往往都是一个新线程, 首先,把当前线程作为反应器线程,之后,执行事件轮询的业务方法。
executor是反应器组的成员,类型是ThreadPerTaskExecutor,一个特殊的线程池,每执行一个任务,就创建一个新线程。
线程的创建来源于MultithreadEventExecutorGroup线程工厂
线程的数量,从哪里来的?
在最开始时,装配引导类时,有boss group、worker group,boss group线程数,指定1个连接监听的线程就够了;worker group线程数,事件处理,默认不传就是CPU核数,比如8核CPU就是16个线程。
有多少个线程数,就有多少个反应器,newChild就是创建一个EventLoop,对应的就是一个线程,线程创建的时机和EventLoop创建的时机不一样,线程是在EventLoop执行任务的时候创建的(doStartThread),创建完线程之后,EventLoop的核心方法run就跑起来了。
NioEventLoop.run()方法,就是IO事件轮询的方法,是一个死循环,比如Selector发生了可读事件,就会被run方法轮询到,run方法还做了异步任务(非IO任务)的执行,register0(Java的Channel绑定到Selector)也是由run方法执行的,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public final class NioEventLoop extends SingleThreadEventLoop {
// ...
private volatile int ioRatio = 50;
// ...
@Override
protected void run() {
for (;;) {
try {
// select 策略选择
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
// 非阻塞的select策略
case SelectStrategy.CONTINUE:
continue;
// 阻塞的select策略
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
// 不需要select,目前已经有可执行的任务了
default:
}
// 执行网络IO事件和任务调度
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理网络IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 处理系统Task和自定义Task
runAllTasks();
}
} else {
// 根据ioRatio计算非IO最多执行的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
NioEventLoop事件轮询包含两种策略,非阻塞的select策略、阻塞的select策略。
在这个死循环里边,processSelectedKeys就在分发IO事件,runAllTasks,就是处理任务队列的任务。
NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行IO任务和非IO任务,
IO任务:即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。- 非
IO任务:添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。
第三步:分发
先看一下整个查询的过程,事件分发有个前提,IO事件已经通过选择器查询出来了。
run方法里死循环的第一步,就是事件的查询(把选择器里边的事件查出来),这个查询相较于经典Reactor模式有点复杂,但是原理类似,这里分多种不同策略,比如阻塞式查询、非阻塞查询,接下来就到了processSelectedKeys,做事件分发。
processSelectedKeys处理IO事件,这里有两种情况,第一种是优化过的,第二种是普通的。Netty对Selector做过优化,Netty会尝试获取权限去优化原生Selector,如果可以,selectedKeys不为null,就会走优化过的处理方式。优化不成功,就还是用原始经典的Selector选择器事件处理方式进行事件处理,这里先不做优化的展开介绍。两种方式主要是遍历selectionKey的方式不同,具体处理事件的调用逻辑是完全一致的。
NioEventLoop.processSelectedKeys方法实现
下面介绍经典的选择键的处理,processSelectedKeysPlain,迭代selectedKeys获取就绪的IO事件,为每个事件都调用processSelectedKey来处理它。
NioEventLoop.processSelectedKeysPlain方法实现
在前面的channel注册时,将NioSocketChannel以附加字段的方式添加到了selectionKey中。在这里,通过k.attachment()取得这个通道对象,然后就调用processSelectedKey来处理这个IO事件和通道。
处理函数,会处理不同的事件,比如可写、可读,第一个参数是事件选择键,第二个是选择键的附件,比如对通道注册来说,附件就是通道,
NioEventLoop.processSelectedKey方法实现
NioEventLoop.processSelectedKey方法中处理了三个事件,
- OP_READ,可读事件,即Channel中收到了新数据可供上层读取。
- OP_WRITE,可写事件,即上层可以向Channel写入数据。
- OP_CONNECT,连接建立事件,即TCP连接已经建立,Channel处于active状态。
通道可读,会调用NioByteUnsafe.read方法,核心是doReadBytes,把通道里边的数据(发送过来的数据)读取出来,放到ByteBuf,
读取NioSocketChannel,NioSocketChannel.doReadBytes方法从socket revbuf读取数据,但每次读取前都需要记录缓冲区中可写区域的大小,用于判断缓冲区是否读满,继而决定是否继续读取数据。
NioSocketChannel.doReadBytes方法实现
bytebuf.writeBytes,javaChannel方法拿到Java NIO的原生通道,把通道的数据读取到Bytebuf里,读取的量是通过公式(可适配的)算出来的,比如按照之前监视的数据推断这次应该从通道里读多少数据。
把数据从通道里边读出来之后,下一步就是把读出来的数据分发出去。把bytebuf通过通道channel的流水线分发出去,分发给谁呢?就是流水线上面的处理器。流水线处理器就会读到bytebuf,bytebuf就这样一步步进入到业务处理器进行处理。
NioByteUnsafe.read 方法的后半部分,
- 触发
pipeline.fireChannelRead(byteBuf),通过pipeline触发读事件。 - 判断是否继续读有两个标准,一是不能超过最大的读取次数(默认
16次);二是缓冲区的数据每次都要读满,比如分配2KB ByteBuf,则必须读取2KB的数据。
整个事件的分发,就是这样的流程。EventLoop三步曲,就是这些了。
EventLoop无锁化设计
EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程(EventLoop只有一个线程),其内部会维护一个selector和taskQueue,负责处理网络IO请求和内部任务,这里的selector和taskQueue是线程内部的。
selector的IO事件处理,以及任务队列的任务处理,都在EventLoop线程串行的执行(内部的任务和事件的查询、处理、分发都是串行的,也就不存在线程安全问题,无锁化设计,性能非常高)。
NioEventLoop任务队列核心原理
任务队列,对应的成员是taskQueue,位置在SingleThreadEventExecutor,这个队列比较特殊,不是普通的队列,在newTaskQueue里边创建的,创建的是Mpsc队列,实现由JCTools工具包提供,
Mpsc,Multi Producer Single Consumer(Lock less, bounded and unbounded),多生产者单一消费者无锁队列(有界和无界都有实现),来自JCTools工具包。
早在1996年就有论文提出了无锁队列的概念,再到后来Disruptor,高性能已得到生产的验证。Jctools中的高性能队列,其性能丝毫不输于Disruptor。
JCTools(Java Concurrency Tools)提供了一系列非阻塞并发数据结构(标准Java中缺失的),当存在线程争抢的时候,非阻塞并发数据结构比阻塞并发数据结构能提供更好的性能JCTools是一个开源工具包,在Apache License 2.0下发布,并在Netty、Rxjava等诸多框架中被广泛使用。源码参见JCTools的开源Github仓库。
在Maven中引入JCTools坐标就能使用JCTools了,
1
2
3
4
5
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>3.0.0</version>
</dependency>
JCTools提供的数据结构,第一大类就是Map,JCTools提供的非阻塞Map,
ConcurrentAutoTable(后面几个map/set结构的基础)NonBlockingHashMap(ConcurrentHashMap的增强)NonBlockingHashMapLongNonBlockingHashSetNonBlockingIdentityHashMapNonBlockingSetInt
另外,提供了非阻塞队列,Netty用到的是MPSC,JCTools提供的非阻塞队列分为4类,可以根据不同的应用场景选择使用,
SPSC-单一生产者单一消费者(有界和无界)MPSC-多生产者单一消费者(有界和无界)SPMC-单生产者多消费者(有界)MPMC-多生产者多消费者(有界)
任务队列的内容分为两部分,
- 任务的提交
- 任务的处理
任务提交
Task任务的提交,有3种典型使用场景,
- 用户提交的普通任务
- 用户提交的定时任务
- 非
Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。
普通任务提交,使用eventLoop的execute方法,通过ChannelHandlerContext获取channel,通过channel获取eventLoop,然后调用execute方法即可放入到任务队列,代码如下,
1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = ctx.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
// dosomething(...)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
例子就是通道注册时,
AbstractChannel.AbstractUnsafe.register注册时
定时任务提交,和普通任务类似,还是使用通道,ctx.executor()返回的是EventLoop,说明定时任务的提交,还是用的反应器EventLoop,只不过方法用的是schedule。
Netty提供了一些添加定时任务的接口,它就是NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,挑一个来看看(其它重载底层队列都一样),
定时任务也大同小异,都是通过ChannelHandlerContext获取channel,通过channel获取eventLoop,然后调用schedule方法即放入到任务队列,代码如下,
1
2
3
4
5
6
7
8
9
10
11
12
13
//使用定时器,发送心跳报文
public void heartBeat(ChannelHandlerContext ctx,
ProtoMsg.Message heartbeatMsg) {
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info("发送 HEART_BEAT 消息 to server");
ctx.writeAndFlush(heartbeatMsg);
//递归调用,发送下一次的心跳
heartBeat(ctx, heartbeatMsg);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
schedule第一个参数和普通任务一样,传入一个线程即可,第二个参数是延时时间,第三个参数是延时单位,这里用的是秒。
channel的各种方法,是由Reactor线程执行的,非Reactor线程执行的话,就必须做一个任务,加入到任务队列,由Reactor线程后续调度执行。
1
2
3
4
5
// 非反应器线程的消息发送操作
// 写Protobuf数据帧
public synchronized void writeAndFlush(Object pkg) {
channel.writeAndFlush(pkg);
}
流程如下,如果A线程要调用EventLoop channel发消息,必须把发消息的操作变成一个任务,加入到EventLoop的TaskQueue,再由EventLoop里自己的线程从任务队列里边把任务取出来,再去执行消息的发送操作。
当用户线程(业务线程)发起write操作时,Netty会进行判断,如果发现不是NioEventLoop线程(反应器线程),则将发送消息封装成WriteTask,放入NioEventLoop的任务队列,由NioEventLoop线程后续去执行。
看一下过程大概对应的代码,用户线程(业务线程)发起write操作时的入口,
1
io.netty.channel.AbstractChannelHandlerContext.write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
AbstractChannelHandlerContext.writeAndFlush方法
AbstractChannelHandlerContext.write方法
write里边,做一个判断,如果是Reactor线程自己在做(inEventLoop),直接干活;如果不是,就封装成一个任务,安全执行。整个过程和普通任务提交类似,只不过是定义了一个任务的类型,
AbstractChannelHandlerContext.safeExecute方法
这里的executor执行的是Netty自己实现的SingleThreadEventExecutor.execute()方法,这里把任务相关的参数封装了一下,普通的Runnable没有这么多成员。
为了节省内存,创建了一个可回收的对象池,IO通道有大量工作要做,并且很多都是异步的,不同的非Reactor线程发消息,任务实例需要复用。对象池就可以把这些实例回收,不需要每次都新建,可以反复使用,高效利用内存。
AbstractChannelHandlerContext.AbstractWriteTask,WriteAndFlushTask的父类
execute,任务提交方法,把传进来的task放到任务队列里边,不同Netty版本实现有些微小差异,
SingleThreadEventExecutor.execute
SingleThreadEventExecutor将通道注册的任务加到taskQueue队列
至此,异步任务成功加入taskQueue。
taskQueue是mpsc队列,即多生产者单消费者队列,Netty使用mpsc,将外部线程的task聚集起来,在Reactor线程内部用单线程来无锁,串行执行。
任务调度
异步任务的调度执行路径,代码调用路径如下,
1
2
3
io.netty.channel.nio.NioEventLoop#run
--> io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
--> io.netty.util.concurrent.AbstractEventExecutor#safeExecute
这里safeExecute执行的task,就是前面write写入时包装的AbstractWriteTask,对应实现类路径为io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask.run。AbstractWriteTask的run经过一些系统处理操作,最终会调用io.netty.channel.ChannelOutboundBuffer.addMessage方法,将发送消息加入发送队列(链表)。
之前看过,查询到事件之后,就走到了processSelectedKeys,如果IO处理的比例是100%,就是所有查询出来的IO事件的活都要干,那就处理完所有IO事件,再处理所有任务队列里边的任务。
默认情况下,IO处理的比例是50,首先把时间记录下来,然后处理IO事件,得到处理IO事件的时间,根据比例计算任务处理的事件该多久。两边加起来的比例等于1,上面IO事件处理时间没办法压缩,可以控制的是任务处理时间。
看不加时间限制的runAllTasks方法,从定时任务里边,把到期了的任务取出来,把这些定时任务加到任务队列里边,
定时任务队列scheduledTaskQueue,是一个专门的任务队列,定义在AbstractScheduledEventExecutor,EventLoop的基类之一。
AbstractScheduledEventExecutor.scheduledTaskQueue
提交的定时任务,是提交到ScheduledTaskQueue里边了,只有它到期了,才把它取出来,加入到TaskQueue,直到把所有的定时任务都取出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//从定时任务队列中抓取第一个定时任务
//寻找截止时间为nanoTime的任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果该定时任务队列不为空,则塞到普通任务队列里面
while (scheduledTask != null) {
//如果添加到普通任务队列过程中失败
if (!taskQueue.offer(scheduledTask)) {
//则重新添加到定时任务队列中
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//继续从定时任务队列中拉取任务
//方法执行完成之后,所有符合运行条件的定时任务队列,都添加到了普通任务队列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
runAllTasksFrom方法,执行任务队列里边所有的任务,
SingleThreadEventExecutor.runAllTasksFrom方法
safeExecute(task)执行任务
AbstractEventExecutor.safeExecute方法
举个例子,前面说的消息发送(前面write写入时),封装成了AbstractWriteTask,把要发送的消息、上下文都放到这个task实例里边,AbstractWriteTask的run只执行最终的通道写出,最终会调用到addMessage,将发送的消息加入到发送列表,最终发送出去。任务的执行都在run方法里边完成。
具体工作和通道的消息发送流程有关,有需要的话,后续做专题介绍。
ChannelConfig通道配置类
首先看,内存分配器怎么作用到通道上面的呢?
通过通道配置选项
下面看下通道配置类和通道配置选项。
什么是
ChannelConfig?
ChannelConfig通道配置类用于管理各种通道的配置选项。每个特定的Channel实现类都有自己对应的ChannelConfig实现类,比如,
NioSocketChannel对应的配置类为NioSocketChannelConfig;NioServerSocketChannel对应的配置类为NioServerSocketChannelConfig。\
Netty的通道非常多,每种通道都有各自通道配置选项,专门设计了通道配置类来干这个事情。
以SocketChannel通道配置类为例,
- 传输通道配置类
- 监听通道配置类
- 顶层接口,
ChannelConfig - 有一个默认实现配置类,
DefaultChannelConfig
如何通过
Channel接口获取ChannelConfig的实例?
在Channel接口中定义了一个方法config(),用于获取特定通道实现的配置,Channel子类需要实现这个接口。
1
2
3
4
5
public interface Channel extends AttributeMap, Comparable<Channel> {
...
ChannelConfig config();
...
}
ChannelConfig实例的创建时机
通常Channel实例,在创建的时候,就会创建其对应的ChannelConfig实例。
在构造函数里边,例如,NioServerSocketChannel和NioSocketChannel都是在构造方法中创建了其对应的ChannelConfig实现。
1
2
3
4
5
6
7
8
9
10
11
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
// 构造方法中创建NioServerSocketChannelConfig实例
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
1
2
3
4
5
6
7
8
9
10
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel
// ...
private final SocketChannelConfig config;
// ...
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
// 构造方法中创建SocketChannelConfig实例
config = new NioSocketChannelConfig(this, socket.socket());
}
// ...
ChannelConfig的配置项ChannelOption
Netty设计一个ChannelOption类,用于封装ChannelConfig配置项支持的所有选项。
ChannelConfig与ChannelOption的关系是什么呢?
ChannelConfig类似Map,ChannelOption是Map的key,ChannelConfig定义了相关方法来获取和修改Map中的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ChannelConfig {
//获取所有参数
Map<ChannelOption<?>, Object> getOptions();
//设置所有参数
boolean setOptions(Map<ChannelOption<?>, ?> options);
//获取以某个ChannelOption为key的参数值
<T> T getOption(ChannelOption<T> option);
//替换某个ChannelOption为key的参数值
<T> boolean setOption(ChannelOption<T> option, T value);
}
举个例子,ChannelConfig类的选项修改。当我们想修改一个Map中的参数时,使用setOption方法。
例如,希望为NioSocketChannel的内存分配器配置为PooledByteBufAllocator,则可以使用类似以下方式来设置,
1
2
3
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Netty预定了很多配置项,ChannelConfig支持的通用ChannelOption
ChannelOption.CONNECT_TIMEOUT_MILLIS连接超时设置ChannelOption.WRITE_SPIN_COUNTChannelOption.AUTO_READChannelOption.MAX_MESSAGES_PER_READChannelOption.RCVBUF_ALLOCATORChannelOption.ALLOCATOR内存分配器ChannelOption.WRITE_BUFFER_HIGH_WATER_MARKChannelOption.WRITE_BUFFER_LOW_WATER_MARKChannelOption.MESSAGE_SIZE_ESTIMATORChannelOption.AUTO_CLOSE
和socket的传输设计有关系,SocketChannelConfig在ChannelConfig基础上额外支持的ChannelOption
ChannelOption.SO_KEEPALIVEChannelOption.SO_REUSEADDRChannelOption.SO_LINGERChannelOption.TCP_NODELAYChannelOption.SO_RCVBUFChannelOption.SO_SNDBUFChannelOption.IP_TOSChannelOption.ALLOW_HALF_CLOSURE
ChannelOption的对应set/get方法,事实上这里的每一种ChannelOption,除了可以使用setOption方法来进行设置,
1
ch.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
在ChannelConfig接口中都为其设置了对应的快捷set/get方法。
1
2
3
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setAllocator(PooledByteBufAllocator.DEFAULT);














