Netty快速入门系列(四)-理解Pipeline以及Handler
Netty快速入门系列(四)-理解Pipeline以及Handler
理解 ChannelPipeline 和 ChannelHandler 是非常重要的,因为 Netty 处理 I/O 事件就是依靠它们。在实际的通信功能开发中,大部分的时间也将花在 Handler 的编写上。
1. ChannelHandler
1.1. 什么是 ChannelHandler
前面章节说过,ChannelHandler 是一个 Netty 中的接口,负责处理Channel中的各种事件和I/O操作,通常我们的业务逻辑代码就是写在 ChannelHandler 的实现里面。
这里面可以包括数据的编码、数据的解码等,从开发的角度来说,大部分时候写通信应用都是在跟 ChannelHandler 打交道。
ChannelHandler 主要的作用包括如下:
- 处理各种 I/O 事件,例如读取数据、写入数据、建立连接、关闭连接等。
- 将网络上传输过来的字节数据解码为 Java 对象,或将 Java 对象编码为字节数据。
- 捕获和处理网络操作中的一些异常,防止将异常传播到应用层,引起 Bug。
可以看到,基本上通讯应用相关的逻辑,都在这个 ChannelHandler 里面。
这个接口类似于 Mina 中的 IoHandler 接口,都适用于处理 I/O 相关的事件。所以这个接口非常的重要。
1.2 ChannelHandler的类型
在客户端与服务端通信的过程中,数据从客户端发向服务端的过程叫出站,反之称为入站。
ChannelHandler 目前有 2 个子接口,分别是:
- ChannelInboundHandler 处理入站的 I/O 操作,比如连接建立、读取对端发送过来的数据等等。
- ChannelOutboundHandler 处理出站的 I/O 操作,例如连接关闭、发送数据给对端等等。
这两个接口只会分别处理入站的事件以及出站的事件操作,同时处理入站和出站事件。
1.2.1 入站处理器接口
入站处理器说的是ChannelInboundHandler ,它是一个接口。 通常我们不会直接使用,比较常用的是使用下面 2 个实现类。
- ChannelInboundHandlerAdapter 适配器类。简化了 ChannelInboundHandler 接口的实现,这个类将所有的接口方法都提供了空实现。
- SimpleChannelInboundHandler 抽象类。通常我们会实现这个抽象类,然后重写一些有必要的方法。
看一个入站事件处理 Handler 例子:
public class MyChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 注册到 EventLoop");
// 这里可以做一些Channel注册后的初始化操作,比如设置Channel的属性,或者发送初始化消息等。
// 一般用的不多
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 从 EventLoop 中移除");
// 这里可以做一些Channel注销后的清理操作,比如释放Channel占用的资源,或者发送通知消息等。
// 一般用的不多
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel已经建立连接");
// 这里一般是绑定当前 channel 跟账号、设备ID等等之类的关联关系。或者其他的初始化操作。
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 断开连接");
// 这里一般是更新账号离线状态、设备离线状态等操作
// 有时候断线重连也有在这里写的做法
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Channel读取到数据: " + msg);
// 可以对读取到的数据进行处理,比如解码数据,进行业务逻辑处理,或者发送响应消息。
// 处理完成后,可以选择性将数据传递给下一个 Handler
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 读取数据完成");
// 这里可以进行一些数据读取完成后的操作,比如刷新缓冲区,或者发送响应消息等等。
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("触发用户事件: " + evt);
// 这里处理用户自定义事件,比如处理心跳事件,或者执行特定的业务逻辑等等。
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel可写性发生变化");
// 这里做一些Channel可写性变化后的操作,暂停或恢复数据的发送,或者调整缓冲区的大小等
// 可以做流量控制、控制发送给对端数据的频率
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("捕获到异常: " + cause);
// 在此处可以进行异常处理,记录错误日志,或者发送错误响应消息等
// 有些场景下,会选择关闭Channel,这个看实际场景
ctx.close();
}
}
1.2.2 出站处理器接口
出站处理器说的是 ChannelOutboundHandler ,它也是一个接口,主要的实现类有:
- ChannelOutboundHandlerAdapter。 适配器类,默认将 ChannelOutboundHandler 的接口方法提供了空实现,子类可以按需重写相关的方法。
看一个出站事件处理 Handler 例子:
public class MyChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("Channel 绑定到本地地址: " + localAddress);
// Channel绑定后的操作,比如记录绑定信息、设置Channel的属性等等
super.bind(ctx, localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("Channel 连接到远程地址: " + remoteAddress);
// 这里做一些Channel连接后的操作,记录连接信息,或者发送认证消息等等
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("Channel 断开连接");
// Channel断开连接后的操作,记录断开连接信息,或者释放Channel占用的资源等等
super.disconnect(ctx, promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("Channel 关闭");
// Channel关闭后的操作,记录关闭信息、释放Channel占用的资源等
super.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("Channel 从EventLoop中注销");
// 记录注销信息,或者释放Channel占用的资源等,实际项目中用的不多
super.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 读取数据");
// Channel读取数据前的操作
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("Channel写入数据: " + msg);
// 数据写入前的操作,比如编码数据、数据的统计等等
super.write(ctx, msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel 刷新数据到底层传输");
// 数据刷新前的操作,比如记录刷新操作
super.flush(ctx);
}
}
2. ChannelPipeline
ChannelPipeline 是一个存储 ChannelHandler 对象的链表,当请求进来以后会依次触发链表中的 Handler 进行处理。
2.1 工作机制
当一个 I/O 事件发生时,它会沿着 ChannelPipeline 中的处理器链进行传播。数据的处理方向是单向的,入站事件从前往后处理,出站事件从后往前处理。
入站事件(如读取数据)从链的头部开始,依次经过每个 ChannelInboundHandler。就像下面这张图一样:
当一个入站事件发生时,它会从链表的头部开始,逐个传递给每个 Handler。上面的入站事件Handler 的调用顺序为: Handler1 > Handler2 > Handler3 > 其他Handler。
出站事件(如写入数据)从链的尾部开始,依次经过每个 ChannelOutboundHandler。Handler 的调用顺序就变成: 其他Handler > Handler3 > Handler2 > Handler1 。
出站的事件发生时,就是从链表的尾部开始,逐步传递给每一个 Handler。
在 Bootstrap 中这样配置 Pipeline:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 重点
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline p = ch.pipeline();
p.addLast(new Handler1());
p.addLast(new Handler2());
p.addLast(new Handler3());
// 添加其他 handler
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.2 常见的操作
ChannelPipeline 常见的操作包括添加 Handler、移除 Handler。
- addFirst()、addLast() 在 Pipeline 链表的头部或尾部部添加 Handler。
- addBefore() 、addAfter() 在特定 Handler 前面或者后面添加新的 Handler。
- remove() 移除指定的 Handler。
- replace() 替换指定的 Handler。
可以看出,上面的操作都是链表的一些操作。通常 ChannelPipeline 在初始化时用的比较多,要么就是在运行过程中移除某个 Handler。