七的博客

Netty快速入门系列(四)-理解Pipeline以及Handler

网络通信

Netty快速入门系列(四)-理解Pipeline以及Handler

理解 ChannelPipeline 和 ChannelHandler 是非常重要的,因为 Netty 处理 I/O 事件就是依靠它们。在实际的通信功能开发中,大部分的时间也将花在 Handler 的编写上。

1. ChannelHandler

1.1. 什么是 ChannelHandler

前面章节说过,ChannelHandler 是一个 Netty 中的接口,负责处理Channel中的各种事件和I/O操作,通常我们的业务逻辑代码就是写在 ChannelHandler 的实现里面。

这里面可以包括数据的编码、数据的解码等,从开发的角度来说,大部分时候写通信应用都是在跟 ChannelHandler 打交道。

ChannelHandler 主要的作用包括如下:

  • 处理各种 I/O 事件,例如读取数据、写入数据、建立连接、关闭连接等。
  • 将网络上传输过来的字节数据解码为 Java 对象,或将 Java 对象编码为字节数据。
  • 捕获和处理网络操作中的一些异常,防止将异常传播到应用层,引起 Bug。

可以看到,基本上通讯应用相关的逻辑,都在这个 ChannelHandler 里面。

这个接口类似于 Mina 中的 IoHandler 接口,都适用于处理 I/O 相关的事件。所以这个接口非常的重要。

1.2 ChannelHandler的类型

在客户端与服务端通信的过程中,数据从客户端发向服务端的过程叫出站,反之称为入站。

ChannelHandler 目前有 2 个子接口,分别是:

  • ChannelInboundHandler 处理入站的 I/O 操作,比如连接建立、读取对端发送过来的数据等等。
  • ChannelOutboundHandler 处理出站的 I/O 操作,例如连接关闭、发送数据给对端等等。

这两个接口只会分别处理入站的事件以及出站的事件操作,同时处理入站和出站事件。

1.2.1 入站处理器接口

入站处理器说的是ChannelInboundHandler ,它是一个接口。 通常我们不会直接使用,比较常用的是使用下面 2 个实现类。

  • ChannelInboundHandlerAdapter 适配器类。简化了 ChannelInboundHandler 接口的实现,这个类将所有的接口方法都提供了空实现。
  • SimpleChannelInboundHandler 抽象类。通常我们会实现这个抽象类,然后重写一些有必要的方法。

看一个入站事件处理 Handler 例子:

public class MyChannelInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 注册到 EventLoop");
        
        // 这里可以做一些Channel注册后的初始化操作,比如设置Channel的属性,或者发送初始化消息等。
        // 一般用的不多
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 从 EventLoop 中移除");
        
        // 这里可以做一些Channel注销后的清理操作,比如释放Channel占用的资源,或者发送通知消息等。
        // 一般用的不多
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel已经建立连接");
        
      
        // 这里一般是绑定当前 channel 跟账号、设备ID等等之类的关联关系。或者其他的初始化操作。
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 断开连接");
        
        // 这里一般是更新账号离线状态、设备离线状态等操作
        // 有时候断线重连也有在这里写的做法
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Channel读取到数据: " + msg);
        
        // 可以对读取到的数据进行处理,比如解码数据,进行业务逻辑处理,或者发送响应消息。
        
        // 处理完成后,可以选择性将数据传递给下一个 Handler
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 读取数据完成");
        
        // 这里可以进行一些数据读取完成后的操作,比如刷新缓冲区,或者发送响应消息等等。
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("触发用户事件: " + evt);
        
        // 这里处理用户自定义事件,比如处理心跳事件,或者执行特定的业务逻辑等等。
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel可写性发生变化");
        
        // 这里做一些Channel可写性变化后的操作,暂停或恢复数据的发送,或者调整缓冲区的大小等
        // 可以做流量控制、控制发送给对端数据的频率
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕获到异常: " + cause);
        
        // 在此处可以进行异常处理,记录错误日志,或者发送错误响应消息等
        
        // 有些场景下,会选择关闭Channel,这个看实际场景
        ctx.close();
    }
}

1.2.2 出站处理器接口

出站处理器说的是 ChannelOutboundHandler ,它也是一个接口,主要的实现类有:

  • ChannelOutboundHandlerAdapter。 适配器类,默认将 ChannelOutboundHandler 的接口方法提供了空实现,子类可以按需重写相关的方法。

看一个出站事件处理 Handler 例子:

public class MyChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("Channel 绑定到本地地址: " + localAddress);
        
        // Channel绑定后的操作,比如记录绑定信息、设置Channel的属性等等
        super.bind(ctx, localAddress, promise);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("Channel 连接到远程地址: " + remoteAddress);
        
        
        // 这里做一些Channel连接后的操作,记录连接信息,或者发送认证消息等等
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("Channel 断开连接");
        
        // Channel断开连接后的操作,记录断开连接信息,或者释放Channel占用的资源等等
        super.disconnect(ctx, promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("Channel 关闭");
        
        // Channel关闭后的操作,记录关闭信息、释放Channel占用的资源等
        super.close(ctx, promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("Channel 从EventLoop中注销");
        
        // 记录注销信息,或者释放Channel占用的资源等,实际项目中用的不多
        super.deregister(ctx, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 读取数据");
        
        // Channel读取数据前的操作
        super.read(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("Channel写入数据: " + msg);
        
        // 数据写入前的操作,比如编码数据、数据的统计等等
        super.write(ctx, msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel 刷新数据到底层传输");
        
        // 数据刷新前的操作,比如记录刷新操作
        super.flush(ctx);
    }
}

2. ChannelPipeline

ChannelPipeline 是一个存储 ChannelHandler 对象的链表,当请求进来以后会依次触发链表中的 Handler 进行处理。

2.1 工作机制

当一个 I/O 事件发生时,它会沿着 ChannelPipeline 中的处理器链进行传播。数据的处理方向是单向的,入站事件从前往后处理,出站事件从后往前处理。

入站事件(如读取数据)从链的头部开始,依次经过每个 ChannelInboundHandler。就像下面这张图一样:

ChannelPipeLine流程

当一个入站事件发生时,它会从链表的头部开始,逐个传递给每个 Handler。上面的入站事件Handler 的调用顺序为: Handler1 > Handler2 > Handler3 > 其他Handler。

出站事件(如写入数据)从链的尾部开始,依次经过每个 ChannelOutboundHandler。Handler 的调用顺序就变成: 其他Handler > Handler3 > Handler2 > Handler1 。

出站的事件发生时,就是从链表的尾部开始,逐步传递给每一个 Handler。

在 Bootstrap 中这样配置 Pipeline:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {
        final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        final EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            final ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
                
             // 重点   
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     final ChannelPipeline p = ch.pipeline();
                     p.addLast(new Handler1());
                     p.addLast(new Handler2());
                     p.addLast(new Handler3());
                     // 添加其他 handler 
                 }
             });

            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2.2 常见的操作

ChannelPipeline 常见的操作包括添加 Handler、移除 Handler。

  • addFirst()、addLast() 在 Pipeline 链表的头部或尾部部添加 Handler。
  • addBefore() 、addAfter() 在特定 Handler 前面或者后面添加新的 Handler。
  • remove() 移除指定的 Handler。
  • replace() 替换指定的 Handler。

可以看出,上面的操作都是链表的一些操作。通常 ChannelPipeline 在初始化时用的比较多,要么就是在运行过程中移除某个 Handler。