Netty-Channel

Posted by klaus_turbo on 2020-10-21
Netty

什么是Channel

Channel(当然是Netty自己抽象的Channel)是netty的网络操作对象,也可以这么说,Channel是与网络Socket套接字的联系。但是归根究底,Channel是一个用于I/O操作的组件,负责诸如:读,写,连接,绑定等操作。

Channel能做写什么事情

Channel为用户提供了:

  • Channel的当前状态(例如,是否打开?是否已连接?)
  • Channel的ChannelConfig配置参数(例如,接收缓冲区的大小)
  • 通道支持的I / O操作(例如,读,写,连接和绑定)
  • ChannelPipeline处理所有与该通道相关的I/O事件和请求

Channel的工作原理

Channel的功能介绍

我们接下来就来了解一下Channel大概涵盖了那些功能。

1
ChannelId id();

返回的是一个ChannelId对象,通过调用ChannelId相关的方法,获取到的是字符串类型的Id,这个Id代表了当前这个Channel通道,是Channel的唯一标识,可以理解为是一个key。他的可能生成策略如下:

  1. 机器的MAC地址等可以代表全局唯一的信息;
  2. 当前的进程ID;
  3. 当前系统的毫秒/纳秒;
  4. 32位的随机整型数;
  5. 32位自增的序列数;
1
EventLoop eventLoop();

这个方法返回的是EventLoop.Channel需要注册到多路复用器上,用于处理I/O事件,通过此方法可以获取到当前Channel注册的EventLoop。EventLoop本质上就是处理网络读写的Reactor线程。在Netty中,它不单单只是用于处理网络事件,也可以用来执行定时任务和用户自定义的NioTask等任务。(具体可以阅读EventLoop的相关博客内容)

1
Channel parent();

对于服务端的Channel来说,它的父Channel为null。对于客户端来说,它的parent就是创建它的ServerSocketChannel。

1
ChannelConfig config();

获取当前Channel的配置信息。

1
boolean isOpen();

用于判断当前Channel是否已经打开。

1
boolean isRegistered();

判断当前Channel是否已经注册到EventLoop上面。

1
boolean isActive();

判断当前channel是否处于激活状态。

1
SocketAddress localAddress();

获取当前Channel绑定的本地地址,若当前channel未绑定,则返回null。

1
SocketAddress remoteAddress();

获取当前Channel连接的远端地址,若当前Channel没有被连接,则返回null。

以上是一些Channel的一些方法的定义,由于本身Channel是一个抽象的接口,所以Channel里面只有方法的定义而没有涉及到一些具体的操作,详情还是要结合源码来一起分析Channel的。

Channel源码分析

由于Channel是一个顶层接口,其实现类非常的多,如果读者有兴趣想进一步了解的,可以从官网上下载其源码进行分析Channel到底有哪些实现类,以及这些实现类到底能做些什么事情。这里我们就主要分析其两个比较主要的实现类,NioServerSocketChannel和NioSocketChannel。

Channel继承关系类图

首先看 NioServerSocketChannel 的继承关系类图:

netty_channel_class_extends.png

其次是 NioSocketChannel 的继承关系类图:

netty_nio_socket_channel.png

刨除Channel顶层接口实现的几个接口以及AbstractChannel抽象类继承的抽象类之外,NioServerSocketChannel 和 NioSocketChannel的继承关系还是相对简单的。

下面我们就来具体的分析。

AbstractChannel的源码分析

成员变量的定义

先来看一下AbstractChannel成员变量的定义:

1
2
3
4
5
6
7
8
9
10
11
12
private final Channel parent;// 父Channel
private final ChannelId id;// Channel全局唯一的ID
private final Unsafe unsafe;// Unsafe实例
private final DefaultChannelPipeline pipeline;// 当前Channel对应的PipeLine
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
private Throwable initialCloseCause;
private boolean strValActive;
private String strVal;

结合前面Channel的功能介绍,不难发现,这里面定义的一些个成员变量基本都是围绕着Channel的功能展开的,聚合了所有Channel使用到的能力对象,由AbstractChannel提供初始化和统一的封装,对于一些与子类强相关的方法则一抽象的形式去定义,由子类自己去具体实现。如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
核心API的分析

在Channel进行I/O操作的时候,他会触发对应的事件方法。Netty基于事件驱动,所以也就是说当Channel进行I/O操作的时候会产生响应的I/O事件,然后事件在PipeLine里面传播,然后由对应的ChannelHandler对事件进行拦截处理,有点类似于AOP。
大体看一下AbstractChannel的基本I/O操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}

@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(promise);
}

@Override
public Channel read() {
pipeline.read();
return this;
}

@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}

根据上面图不难看出AbstractChannel的一些I/O操作也都是调用了DefaultChannelPipeline的方法来进行I/O的操作的,那也就是可以这么理解,AbstractChannel最最主要的作用就是对Channel一些功能点的初始化,其中最重要的是对DefaultChannelPipeline的初始化操作,因为AbstractChannel后续的I/O操作也都是调用它的方法来实现的,至此,AbstractChannel的源码我们也基本有了一定的了解。

下面我们再来看看在类图中AbstractChannel的下一级子类的源码。

AbstractNioChannel源码分析

成员变量定义

老规矩,先从成员变量来看:

1
2
3
4
5
6
7
8
9
10
11
private final SelectableChannel ch;
// 代表JDK SelectionKey的OP_READ
protected final int readInterestOp;
volatile SelectionKey selectionKey;-
boolean readPending;
// 连接操作结果
private ChannelPromise connectPromise;
// 连接超时定时器
private ScheduledFuture<?> connectTimeoutFuture;
// 请求通讯地址
private SocketAddress requestedRemoteAddress;

第一个参数:这里定义了一个SelectableChannel,由于NIO Channel,NioSocketChannel和NioServerSocketChannel需要共用,所以定义了一个JDK NIO的SocketChannel和ServerSocketChannel的公共父类SelectableChannel用于设置SelectableChannel参数和进行I/O操作。

第二个参数:readInterestOp,如注解所示,这代表了JDK SelectionKey的OP_READ。

第三个参数:一个由Volatile修饰的selectionKey,这个key是Channel注册到EventLoop(Selector)之后返回的一个代表了这个Channel的key,通过这个key可以获取到对应的Channel,也代表了当前的Channel是处于什么操作:

  • OP_ACCEPT:有新的网络连接可以 accept,值为 16
  • OP_CONNECT:代表连接已经建立,值为 8
  • OP_READ:代表读操作,值为 1
  • OP_WRITE:代表写操作,值为 4

由于Channel会面临多个业务线程并发的写操作,所以使用volatile来让其他线程及时感知到当前Channel的状态。

核心API源码分析

首先是同于Channel注册的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (; ; ) {
try {
selectionKey =javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

这里主要的是:

1
selectionKey =javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

其中方法javaChannel()返回的其实就是在它成员变量里面定义了的SelectableChannel。SelectableChannel是JDK的NIO自带的通道视图。
看到这其实可以看出,底层的Netty其实还是调用了JDk的NIO来实现的。在register方法中的eventLoop()方法返回的是其父类的成员变量定义的eventLoop,是一个NioEventLoop,然后获取其成员变量中定义的Selector多路复用器。在传入register注册方法里面的ops是0,则说明这里的注册对仍和I/O操作事件都不感兴趣,这里只是简单的将当前的Channel注册进Selector中。如果注册成功,则返回selectionKey,通过selectionKey可以从Selector中获取Channel对象。(SelectionKey是绑定Selector和Channel的中间纽带,之间是一对一的关系)
如果当前注册返回的selectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。如果第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除。操作成功之后,将selected置为true,说明之前失效的selectionkey已经被删除。 继续发起下一轮注册操作,如果成功则退出,如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionkey,按理由这种操作不该发生,所以直接抛出异常到上层,由其自行处理。

接下来是另一个方法是:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

在准备读取数据的时候需要先设置网络操作位为读,设置完之后才可以监听读事件。

至此AbstractNioChannel的源码基本就阅读完毕了,根据上面的分析,基本可以这么理解,AbstractNioChannel的作用是将当前的Channel通过NioEventLoop绑定到Selector多路复用器上面,这里是一个注册的作用。

下面我们继续往下看类图的下一部分AbstractNioByteChannel:

AbstractNioByteChannel源码分析

成员变量
1
2
3
4
5
6
7
8
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
// Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
// meantime.
((AbstractNioUnsafe) unsafe()).flush0();
}
};

AbstractNioByteChannel的成员变量就是一个Runnable类型的flushTask,负责继续写半包消息。

API源码分析

这一块比较主要的方法是doWrite

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);// 发送的消息不为空则继续doWriteInternal
incompleteWrite(writeSpinCount < 0);
}

从上至下,首先通过:

1
config().getWriteSpinCount();

获取此次写总共需要执行的循环次数。【循环次数是指一次发送没有完成(写半包),继续循环发送的次数】
然后从传递的参数ChannelOutboundBuffer调用current()方法弹出一条消息,判断消息是否是null,如果是null,则说明消息发送数组里面所有等待发送的消息都发送完毕,清除写半包标记,结束循环。来看一下是如何清除写半包标识的:

1
2
3
4
5
6
7
8
9
10
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}

从当前Selectionkey中获取网络操作位,然后与selectionkey.OP_WRITE做按位与,如果不等于0,说明当前的selectionkey是iswritable的,需要清除写操作位。 清除方法很简单,就是selectionkey.op_write取非之后与愿操作位按位与操作,清除selectionkey的写操作位。

再来看一下这边的循环次数是如何获取的,先看doWriteInternal代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
      if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
}

这里是有两部分的,一部分是对普通ByteBuf的逻辑,还有一部分是对FileRegion的判断,这里主要是ByteBuf。
首先,判断的当然是当前消息的类型是否是ByteBuf类型,如果是,则进行强转,然后判断当前的小时是否是可读的,若消息不可读,则直接从发送环形数组中删除当前消息,继续循环其他的消息。
然后继续向下走,判断消息可读,然后将当前buf的数据写入底层Chnannel,返回的是发送总数,所以这里的localFlushedAmount是指本次发送的字节数。我们来看看这个doWriteBytes底层实现:

1
2
3
4
5
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

不难发现,其实际是将buf中可读的字节都写出到了目标Channel中。
回到doWriteInternal方法,我们就不用疑惑是否会出现localFlushedAmount大于0(也代表了写出了数据),然后buf还是可读的这种情况的出现了。在写出完毕之后,会调用ChannelOutboundBuffer更新发送进度信息。然后接着判断当前消息是否可读,不可读便从当前环形数组将当前消息删除。

至此,我们的doWrite方法已经快接近尾声了,循环部分已经分析完毕了,我么接着来看最后一个方法–incompleteWrite():

1
2
3
4
5
6
7
8
9
10
incompleteWrite(writeSpinCount < 0);

protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
setOpWrite();
} else {
clearOpWrite();
eventLoop().execute(flushTask);
}
}

这里在调用incompleteWrite方法时候传入了循环次数来与0进行比较,如果循环次数不是小于0的,说明之前还有消息没有写完,因为循环没有结束嘛,所以当前如果消息没有写完,则调用setOpWrite()方法去重新为当前Channel的SelectionKey设置写操作位。(注意⚠️:当SelectionKey的OP_WRITE的操作位被设置,那么当前Channel对应的Selector多路复用器就会不断的轮询对应的Channel用于处理没有处理完的半包消息,直到写操作位被清除为止。)
如果没有设置写操作位,那么就需要启用一个单独的Runnable,将其加入到EventLoop之中,由Runnable来负责处理写半包消息,他的实现也很简单,就是调用了flush()来发送缓冲区的消息。

好了,AbstractBioByteChannel的源码到此就分析的差不多了。

下面就接着来分析NioServerSocketChannel的父类 – AbstractNioMessageChannel

AbstractNioMessageChannel的源码分析

由于这个类没有成员变量,我们就直接跳到API源码分析。

API源码分析

这里主要实现方法是doWrite():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}

嗯姆。。。不对,等等!!!这代码好像在哪见过!没错,这里的代码和前面分析的AbstractNioByteChannel的doWrite方法几乎一模一样,AbstractNioByteChannel的doWrite多了一些封装,而这里写的比较直白一些。

如果仔细看,并且你还记得之前的AbstractNioByteChannel的大概的实现方式,你会发现,这里的doWriteMessage发送的是一个直接的pojo,而AbstractNioByteChannel发送的是ByteBuf或者FileRegion,还有就是,这里的处理半包的方式只采用了Selector多路复用器对对应的带有写操作位的Channel的轮询操作来处理未发送完的半包消息的。

接下来我们将会分析在类图最下面的两个类,NioServerSocketChannel和NioSocketChannel。

NioServerSocketChannel源码分析

这部分的源码比较简单,先来看一下它的成员变量和静态方法。

1
2
3
4
5
6
7
8
9
10
11
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
private final ServerSocketChannelConfig config;

首先是定义了一个ChannelMetadata用于存储元信息,然后是定义了一个SelectorProvider,然后用这个provider来打开通道。
最后是一个ServerSocketChannelConfig用来配置TCP参数。

来看一下NioServerSocketChannel的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

在仅有的三个构造函数之中,除了最后一个由调用者传递的ServerSocketChannel之外,其余两个构造函数都会在初始化的时候调用前面定义好的newSocket方法来生成一个ServerSocketChannel,而这个ServerSocketChannel是JDK NIO的Channel,然后调用到最后一个重载构造函数实现。最后你会发现,这里参数的传递到了AbstractNioChannel的构造函数中去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

下面来看一下 Channel 的 doReadMessages :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}

首先是通过Netty的SocketUtils工具类接受客户端的连接,看一下传入的JavaChannel():

1
2
3
4
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}

往下走,会来到AbstractNioChannel :

1
2
3
4
5
private final SelectableChannel ch;

protected SelectableChannel javaChannel() {
return ch;
}

不多分析这里的SelectableChannel,接着分析doReadMessages。
在接受了新的客户端连接之后,首先判断创建的连接是否是空,若不是空连接,则利用当前的NioServerSocketChannel和SocketChannel来创建一个新的NioSocketChannel,并将其加入List buf中,返回1,代表读取连接数据成功。
对于NioServerSocketChannel的读取操作就是接受客户端的连接,创建NioSocketChannel。

对于一些NioServerSocketChannel无需调用的方法,作者直接是返回了UnsupportedOperationException的异常来提示。

至此NioServerSocketChannel就分析完毕了,记住最重要的一点,NioServerSocketChannel的最要作用就是接受连接,创建NioSocketChannel。

接下来,我们来分析一下NioSocketChannel的实现。

NioSocketChannel源码分析

连接操作

重点分析与客户端连接有关的方法,首先来看连接 doConnect():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected =
SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}

首先判断本地socket地址是否存在,若不为空,则调用doBind0方法去绑定本地Socket地址。
然后发起TCP连接远端Socket地址,若连接成功,则返回true,连接失败,则直接抛出I/O异常,若暂时没有连接上,不知道具体状态,则返回false。
若没有连接成功(不是失败),则重置Channel的SelectionKey的操作位为连接操作位。若是在连接的时候抛出了I/O异常,则success参数就为false,调用doClose()关闭连接。

写半包

先看一下源码的写半包的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
clearOpWrite();
return;
}
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();

switch (nioBufferCnt) {
case 0:
writeSpinCount -= doWrite0(in);
break;
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}

实在太多了,这一部分的处理也是比较复杂的,但是别急,都是写方法,如果你对于之前的源码分析都掌握了的话,其实这部分的源码也是换汤不换药的。
定义的SocketChannel基本都是从一个统一的方法javaChannel()里面获取的。
既然要读,那首先我们就得知道这个数据我们得读多少次能读完,代码的体现就是通过:

1
config().getWriteSpinCount()

来获取循环次数。这个是定义在ChannelConfig里面的,默认循环次数是16,取决于JVM运行的平台。
来到循环体里面,如果buf是空的,则清除写操作位,并return;
若不是空的,则继续向下走:

1
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();

获取设置的单个ByteBuf的最大字节数,也就是设定了每个ByteBuf允许存储的最大字节的数量,获得了ByteBuf的最大字节数之后紧接着就是获取ByteBuf数组:

1
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);

通过ChannelOutboundBuffer获取ByteBuf数组,其中传递的参数是设置好的单个ByteBuf的最大字节数,以及允许的最大缓冲区数量,然后返回ByteBuf数组。

1
int nioBufferCnt = in.nioBufferCount();

调用了buf的nioBufferCount()方法,来获取需要发送的ByteBuffer数组个数。nioBufferCount()方法必须在调用了nioBuffers()之后才可以调用。

进入条件语句之后,若需要发送的ByteBuffer为0,则说明无消息需要发送,但是可能会有别的事情需要处理,所以调用了doWrite0,并将自旋减去其返回的数值。
当需要发送的ByteBuffer个数为1,则直接取数组内的第一个ByteBuffer去发送,然后是获取缓冲区剩余的可写字节数赋值给attemptedBytes。在调用了SocketChannel的write()方法之后,方法会返回写入SocketChannel的字节数,判断写入的字节数,小于等于0说明TCP缓冲区已经满了,很肯能无法在继续写入,因此从循环中挑出,同时调用incompleteWrite(true)将写半包标识设置为true,用于向多路复用器注册写操作位,告诉多路复用器有没有发送完的半包消息,需要轮训出就绪的SocketChannel继续发送。
在处理完这些事情之后,ChannelOutbountBuffer的该部分缓冲区已经使用完毕了,于是会调用ChannelOutboundBuffer的清除方法:

1
in.removeBytes(localWrittenBytes);

参数是之前写入SocketChannel的字节数。来看一下他的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void removeBytes(long writtenBytes) {
for (; ; ) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}

逻辑比较简单,首先是从当前ChannelOutBoundBuffer弹出第一条发送的ByteBuf,然后获取读索引和可读字节数(可读字节数 = 写索引 - 读索引),
将可读字节数与已经发送的字节数进行比较,可读字节数小于等于已发送字节数说明当前的bytebuf已经完全被发送出去了,那么就更新ChannelOutboundBuffer的发送进度信息以及将写指针前移,然后清空该部分内存空间。若可读字节数大于已写出字节数,说明出现了半包问题,那就需要更新可读索引,将将读索引后移writtenBytes个位置,这样当下次轮询到的时候就会重新从正确的读索引处重新开始读取消息,然后也是更新ChannelOutboundBuffer的发送进度信息。

好了,Channel的部分源码解读到这就差不多了,由于篇幅限制,Channel的Unsafe就不再这里继续展开了,笔者会另开一章来专门介绍Unsafe的源码,敬请期待吧。

谢谢阅读。

本人关于图片作品版权的声明:

  1. 本人在此刊载的原创作品,其版权归属本人所有。

  2. 任何传统媒体、商业公司或其他网站未经本人的授权许可,不得擅自从本人转载、转贴或者以任何其他方式复制、使用上述作品。

  3. 传统媒体、商业公司或其他网站对上述作品的任何使用,均须事先与本人联系。

  4. 对于侵犯本人的合法权益的公司、媒体、网站和人员,本人聘请的律师受本人的委托,将采取必要的措施,通过包括法律诉讼在内的途径来维护本人的合法权益。

特此声明,敬请合作。