Netty-ChannelPipeline,ChannelHandler和ChannelHandlerContext

Posted by klaus_turbo on 2020-10-27
Netty

什么是ChannelPipeline?

抽象的来说,ChannelPipeline是一个拦截或者处理 Channel 的入站和出站事件的 ChannelHandler 实例链。

具象的来说,ChannelPipeline是ChannelHandler的容器,负责管理ChannelHandler和事件的调度与拦截。

ChannelPipeline提供了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及ChannelPipeline中的ChannelHandlers如何进行交互。

对于每一个Channel来说,Netty会为其单独创建一个ChannelPipeline并将其与该创建的Pipeline绑定。绑定后,Channel与ChannelPipeline之间的耦合是永久的。Channel不能绑定另一条ChannelPipeline或者将当前已经绑定的ChannelPipeline与自己捆绑。当然这些个操作Netty已经帮我们做好了。

我们来看下面这一张图:
netty_pipeline.png

图内标注了 InboundHandler 和 OutbountHandler ,表示在ChannelPipeline 中的消息流转是有方向的。同时也展示出了这入站和出站的来源。通常来说,InboundHandler 处理由图中底部由Netty内部I/O产生的入站数据。一个入站的I/O事件将从底部向上传播,出站的I/O事件将从上往下传播,通常Netty将入站开始的地方称为头部,出站开始的地方称为尾部。

在 ChannelPipeline 传播事件时,它会判断 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

ChannelPipeline源码分析

首先来看一下ChannelPipeline的类关系图:

netty_channel_pipeline_class.png

它的继承关系十分的简单,直接实现类也就只有一个DefaultChannelPipeline。由于ChannelPipeline的自身操作主要集中在链的操作上面,而一些例如传播事件等都是通过转发的操作进行的,我们就着重分析其链的操作。

几个常用的操作大概有下面这几种:

1
2
3
4
5
6
7
8
9
10
11
ChannelPipeline addFirst(String name, ChannelHandler handler);

ChannelPipeline addLast(String name, ChannelHandler handler);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

ChannelPipeline remove(ChannelHandler handler);

ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

从方法名字应该不难看出这些方法的作用吧。添加的方法我们这里就来分析一个addBefore(),来看一下它的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
synchronized (this) {
// 做重复校验
checkMultiplicity(handler);
name = filterName(name, handler);
// 通过名字获取ctx实例
ctx = getContextOrDie(baseName);
// 新增ChannelHandler
newCtx = newContext(group, name, handler);
addBefore0(ctx, newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

由于ChannelPipeline支持运行期的动态修改,所以可能存在I/O线程与用户线程的并发访问或者是多个用户线程的并发访问。所以在方法内部使用了同步块。

重复性校验主要的作用是检查当前加入进来的ChannelHandler已经加进来了了,若已经加入那么这个Handler是否使用了@Sharable注解注释了,否则将会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}

然后是通过filterName()这个方法对当前要添加的handler进行取名:

1
2
3
4
5
6
7
8
9
private String filterName(String name, ChannelHandler handler) {
// 无名字则生成一个
if (name == null) {
return generateName(handler);
}
// 判断名字是否重复
checkDuplicateName(name);
return name;
}

下面是核心方法:

1
2
3
ctx = getContextOrDie(baseName);

newCtx = newContext(group, name, handler);

首先是通过getContextOrDie()方法获取当前handler添加的后一个handler的handlerContext。addBefore方法针对两个handler,一个是我们要添加的handler,另一个是已经添加进去的那个handler,在方法中的表述就是newCtx和ctx。来看一下获取ctx(我们要添加进去的handler的后一个handler的handlerContext):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 1
private AbstractChannelHandlerContext getContextOrDie(String name) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
if (ctx == null) {
throw new NoSuchElementException(name);
} else {
return ctx;
}
}
// 2
public final ChannelHandlerContext context(String name) {
return context0(ObjectUtil.checkNotNull(name, "name"));
}
// 3
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}

获取ctx的主要看方法3,由于在初始化的时候pipeline定义了一个头和尾,所以我们可以看到在获取的时候我们总是去除了头尾,遍历context链表,直到有匹配的。
回到方法1,若我们没有匹配到传入的name,则会抛出一个NoSuchElementException异常。反之则返回ctx。
对于新的context,是通过调用了Contex构造方法来产生一个新的context,这个方法在几个添加handler的方法中是差不多的,这就是“每当handler加入到pipeline中的时候就会生成一个对应的contex”。
然后是正式的将我们构造好的context加入到当前的context链中去的操作,通过方法addBefore0()实现:

1
2
3
4
5
6
private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
newCtx.prev = ctx.prev;
newCtx.next = ctx;
ctx.prev.next = newCtx;
ctx.prev = newCtx;
}

这里就是普通的链表操作,当前context持有的前后context引用的变换。

接着看:

1
2
3
4
5
6
// 1
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

首先是代码块1,这里对Channel是否已经注册到Eventloop进行了判断,若未注册,将handler的状态由INIT–>ADD_PENDING,并将添加Handler这个事件加入到一个专门处理延迟事件的链表中(Channel注册后需要处理的任务):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
// 判断是否是添加handler的事件
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// 将当前task加入链尾
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

看一下这个task的真面目:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 父类
private abstract static class PendingHandlerCallback implements Runnable {
final AbstractChannelHandlerContext ctx;
PendingHandlerCallback next;

PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}

abstract void execute();
}

// 实现类
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
// 判断执行的线程是否是原本分配的线程
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
// 执行callHandlerAdded0(ctx)
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
// 事件执行异常,将新加的handlerContext从列表中移除
atomicRemoveFromHandlerList(ctx);
//设置handler的状态为REMOVE_COMPLETE
ctx.setRemoved();
}
}
}
}

这就是其添加的时候定义的一个task,来看这个task执行的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 1
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
// ...省略异常之后handler的移除操作
}
}
// 2
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
handler().handlerAdded(this);
}
}

// 3 设置handler的状态为ADD_COMPLETE
final boolean setAddComplete() {
for (; ; ) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}

task的内容主要是通知这个handler被加进来了。
在方法2中,我们必须在调用handlerAdded之前调用setAddComplete。
否则,如果handlerAdded方法生成任何pipeline事件ctx.handler()都会错过它们,因为状态不允许。

以上是对当前Channel还未注册到Eventloop是的处理,若已经注册了,则如下:

1
2
3
4
5
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}

首先是获取绑定的exector,再判断这个excutor是否是自己注册时候绑定的eventloop(),若是,则继续执行上面的callHandlerAdded0()方法去通知这个handler已经添加了,若不是channel注册时候绑定的那个eventloop,则:

1
2
3
4
5
6
7
8
9
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
}

到这了你会发现怎么这么眼熟,没错,这里就是用exector去执行了handler添加通知的事情,和上面一样的操作。

addBefore的分析到这就结束了,由于其他的一些操作与这个都是大同小异,对于其他的操作这里就不自赘述了,至此,ChannelPipeline就分析结束了。

什么是ChannelHandler?

正如上一小节所说的ChannelHandler是用于拦截或者处理Channel的入站和出站事件的。它可以选择性的拦截和处理自己感兴趣的事件,也可以通过透传和终止事件的传递。基于ChannelHandler接口,用户可以方便的进行业务逻辑定制,例如打印日志,统一封装异常信息,性能统计和消息编解码等。

ChannelHandler支持两种注解:

  • Sharable:允许多个ChannelPipeLine共用同一个ChannelHandler。
  • Skip:被Skip注解的方法不会被调用,直接忽略掉

根据入站和出站的不同,Netty将ChannelHandler扩展了两个重要的子类:

  • ChannelInboundHandler:负责处理入站的数据以及各种状态变化
  • ChannelOutboundHandler:负责处理出站的数据并且允许拦截所有的数据

ChannelHandler和ChannelPipeline的关系如下图所示:

netty_channle_pipeline_in_out.png

什么是ChannelHandlerContext?

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。

ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。 而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

值得注意的是:

  • ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;

  • 如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

Channel,ChannelPipeline,ChannelHandler和ChannelHandlerContext四者之间的关系如下:

netty_channel_handler_process.png

ChannelHandler,ChannelPipeline和ChannelHandlerContext三者的对应关系如下:

netty_handler_context_pipeline_relation.png

本人关于图片作品版权的声明:

  1. 本人在此刊载的原创作品,其版权归属本人所有。

  2. 任何传统媒体、商业公司或其他网站未经本人的授权许可,不得擅自从本人转载、转贴或者以任何其他方式复制、使用上述作品。

  3. 传统媒体、商业公司或其他网站对上述作品的任何使用,均须事先与本人联系。

  4. 对于侵犯本人的合法权益的公司、媒体、网站和人员,本人聘请的律师受本人的委托,将采取必要的措施,通过包括法律诉讼在内的途径来维护本人的合法权益。

特此声明,敬请合作。