七的博客

Netty快速入门系列(三)-Netty基本概念与术语

网络通信

Netty快速入门系列(三)-Netty基本概念与术语

动手实践了一个 HelloWorld 之后,就可以开始了解一些 Netty 比较核心的一些概念。主要是有下面这些:

  • Bootstrap 。 用来启动服务端或者客户端的类。
  • EventLoop、EventLoopGroup 。 处理连接、处理I/O事件等等。
  • Channel。 Channel 代表一个网络连接。
  • ChannelHandler。处理 I/O 事件。
  • ChannelPipeline 。 包含了一系列的 ChannelHandler。
  • ChannelFuture。 代表异步 I/O 操作的结果。
  • ChannelOption。 用于配置 Channel 的参数。
  • ChannelHandlerContext。 表示 ChannelHandler 和 ChannelPipeline 之间的上下文对象,可以用来获取 Channel、触发各种 I/O 事件等。
  • Attribute。 用于存储 Channel 或 ChannelHandlerContext 的自定义属性。
  • Codec。 编码、解码的接口。
  • ByteBuf。用于操作字节数据的容器。

整个 Netty 中我觉得就这么些概念是比较重要的,当然一些包括网络通讯的基本概念不包含在这个里面。

要想熟练的运用 Netty , 上面这些概念必须得要搞懂。不过光看是看不懂的,一篇文章也不可能讲解的很细致。看只是为了先有个大概的概念,在实践中加深理解。

1. Bootstrap

Bootstrap 是 Netty 的一个启动辅助类 。它可以用一种简单的方式完成 Netty 程序的启动、初始化、配置参数等等。

主要作用:

  • 设置 EventLoopGroup。
  • 指定 Channel 类型。
  • 设置 ChannelOption 。
  • 配置 ChannelHandler。
  • 客户端应用可以通过 Bootstrap.connect() 方法来连接到服务端。
  • 服务端应用通过 ServerBootstra.bind() 方法来绑定到指定计算机端口。

所以 Netty 也是提供了两个 Bootstrap 类型:

  • ServerBootstrap 用于启动服务端。
  • Bootstrap 用于启动客户端。

这两个也是非常的好区分,服务端的名称前面增加 Server,客户端的则没有。

在配置 Channel 上还有一点小差异,比如 ServerBootstrap 配置的是 ServerSocketChannel 。 而 Bootstrap 配置的是 SocketChannel。可以看出来,这个也是在名字上做了区分。

对比下初始化方式,ServerBootstrap :

final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    final ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new MyServerHandler());
                }
            });
    ChannelFuture f = b.bind(8080).sync();
    f.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

ServerBootStrap 用于服务端绑定一个本地端口,然后等待其他客户端的连接。所以会绑定两个 EventLoopGroup,通常称为 bossGroup 和 workerGroup。

Bootstrap :

final EventLoopGroup group = new NioEventLoopGroup();
try {
    final Bootstrap b = new Bootstrap();
    b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new MyClientHandler());
                }
            });
    ChannelFuture f = b.connect("localhost", 8080).sync();
    f.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully();
}

Bootstrap 是客户端主动去连接其他服务端接口,所以只绑定一个 EventLoopGroup。

2. Channel

Netty 中的 Channel 就是代表一个网络连接或者说能够进行 I/O 操作的组件。 不过通常都是说一个建立好的网络连接是 Channel 。 这个 Channel 是一个抽象接口,实现类包括 NioSocketChannel、NioServerSocketChannel 等。

Channel 的几个特点:

  • Channel 的所有 I/O 操作,如读、写、连接、绑定都是异步的,会返回一个 ChannelFuture 对象。 跟 JDK 的 Future 是类似的,在操作完成时的时候可以触发回调函数。
  • Channel 是线程安全的,可以在多个线程中共享访问。
  • Channel 生命周期包括打开、绑定、连接、断开和关闭等状态。
  • Channel 可以保存自定义属性 Attribute,方便在 Channel 的整个生命周期中传递一些数据。

演示下 Channel 常用的几个方法:

    final Bootstrap b = new Bootstrap();
    // ... 省略配置

    ChannelFuture f = b.connect("localhost", 8080).sync();

    // 获取 Channel
    Channel channel = f.channel();

	 // 设置当前连接的设备ID到 channel 中
     final Attribute<String> attr = channel.attr(AttributeKey.valueOf("deviceId"));
     attr.set("1062");
    
      // 设置当前 Channel 绑定的的设备ID
     final Attribute<String> attr = channel.attr(AttributeKey.valueOf("deviceId"));
     final String devId = attr.get();

    // 调用 channel.writeAndFlush() 发送数据给服务端
    channel.writeAndFlush(Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8));

    // 关闭连接
    ChannelFuture closeFuture = channel.close();
    closeFuture.addListener(future -> {
        if (future.isSuccess()) {
            System.out.println("Channel 关闭成功");
        } else {
            System.err.println("Close 关闭失败");
        }
    });

3. EventLoop、EventLoopGroup

EventLoop 也是 Netty 中很核心的一个概念。但是 EventLoop 并不是 Netty 特有的概念,而是一种设计模式。 简单点来说就是一种循环机制,用于等待和处理事件。

在一个典型的 EventLoop 中,会有一个线程不断循环的来轮询事件,处理事件,并再次等待新的事件。比如在编写原生的 JDK NIO 的时候,就会有下面的代码:

 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 serverSocketChannel.socket().bind(new InetSocketAddress(8080));
 serverSocketChannel.configureBlocking(false);

 // 打开 Selector
 Selector selector = Selector.open();
 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

 while (true) {
            // 阻塞等待事件
            selector.select(); 
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();

                if (key.isAcceptable()) {
                    // 处理客户端连接
                } else if (key.isReadable()) {
                    // 处理读事件
                }
          }
     }

这里的 while(true) 就是类似一个 EventLoop 的逻辑,不停地去查询 Channel 是否可读可写等。

Netty 中的 EventLoop 主要是一个单线程的循环,不断调用 Selector.select() 方法。 每个 EventLoop 负责处理一个或多个 Channel 的所有 I/O 事件和任务。

总结下 EventLoop 的主要职责:

  • 监听并处理 I/O 事件,比如读、写、连接。
  • 运行用户提交的一些普通任务以及定时任务。
  • 管理 Channel 的注册、激活、注销和关闭等一些生命周期事件。

EventLoopGroup 看名字就可以猜到是一组 EventLoop 的集合,通常用于管理和分配 EventLoop。Netty 中的

EventLoopGroup 可以划分成 2 类,分别是:

  • Boss EventLoopGroup 只负责处理网络连接的接入。
  • Worker EventLoopGroup 负责处理已建立连接的 I/O 操作,通俗点来说就是处理业务逻辑等。

这个在之前的代码中多次出现:

// 处理网络连接的接入
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// 处理连接的 I/O 事件
final EventLoopGroup workerGroup = new NioEventLoopGroup();

举个形象的例子理解这两个 Group。 bossGroup类似于餐厅大门的【迎宾】角色,负责将你引导进餐厅即可,不干别的事情。workerGroup 相当于【服务员】的角色,服务员会问你想吃什么、想点什么、后续上菜、最后结账等等。

4. ChannelHandler

ChannelHandler 是一个 Netty 中的接口,负责处理Channel中的各种事件和I/O操作,通常我们的业务逻辑代码就是写在 ChannelHandler 的实现里面。

6. ChannelPipeline

ChannelPipeline 是一个 Handler 链表,包含多个 ChannelHandler 实例,按照在处理链中的顺序来处理事件。跟数据结构中的双向链表原理是一样的。

7. ChannelFuture

ChannelFuture 其实就是 Java Future 接口的扩展,专门为 Netty 的 Channel 一些操作设计的。

所以跟 Java Future 的作用几乎也是一模一样,都是基于回调跟监听器的机制,提供回调函数在操作完成时进行触发。

主要方法:

  • isDone() 检查操作是否完成。
  • isSuccess() 检查操作是否成功完成。
  • cause() 如果操作失败,返回失败的原因 Throwable。
  • addListener() 添加 ChannelFutureListener,在操作完成时通知。
  • sync() 等待操作完成,如果操作失败则抛出异常。
  • await() 等待操作完成,但不抛出异常。

演示用法:

final ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", 8080));

future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) {
            System.out.println("连接建立成功");
        } else {
            System.err.println("连接建立失败");
            future.cause().printStackTrace();
        }
    }
});

8. ChannelOption

ChannelOption 是 Netty 中用于配置 Channel 的一个配置类,主要是用于设置 Channel 的各种属性。

下面是一些常见的设置,如果不确定具体作用最好不要乱改:

  • ChannelOption.SO_RCVBUF。 调整接收缓冲区大小。
  • ChannelOption.SO_SNDBUF。 调整发送缓冲区大小。
  • ChannelOption.SO_KEEPALIVE。 是否启用或禁用 TCP 保持连接。
  • ChannelOption.SO_REUSEADDR 。 是否允许重用本地地址和端口。
  • ChannelOption.TCP_NODELAY。 是否开启 Nagle 算法,用于减少小数据包的发送。
  • ChannelOption.CONNECT_TIMEOUT_MILLIS。 设置连接超时时间(毫秒)。
  • ChannelOption.SO_BACKLOG 。 设置服务器接受连接的队列长度。

这些参数大部分都是原生 JDK Socket API 里面的一些设置参数,少部分是操作系统的配置参数。

例子:

final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.TCP_NODELAY, true);

9. ChannelHandlerContext

看名字中包含 Context,可以猜到跟上下文有关系。ChannelHandlerContext 就是提供跟 ChannelPipeline 和 Channel 交互的上下文接口。

用的比较多的方法是下面几个:

  • 获取 Channel 。
  // ChannelHandlerContext ctx
  
  Channel channel = ctx.channel();
  • 关闭 Channel。
  ctx.close();
  • 写入消息到 Channel。
  // ChannelHandlerContext ctx
  
  ctx.write(msg);
  ctx.writeAndFlush(msg);
  • 将读操作传递给下一个 ChannelInboundHandler。
  // ChannelHandlerContext ctx
  
  ctx.fireChannelRead(msg);
  • 获取 Channel ,然后调用Channel 的 writeAndFlush(msg) 方法写入数据。
  // ChannelHandlerContext ctx
  
  ctx.channel().writeAndFlush(msg)
  • 获取 ChannelPipeline ,然后添加 Handler 或者移除 Handler。
  // ChannelHandlerContext ctx
  
  ChannelPipeline pipeline = ctx.pipeline();
  
  pipeline.addLast(new MyHandler())
  • 获取 EventExecutor ,然后用 EventExecutor 执行耗时的操作。
  // ChannelHandlerContext ctx
  
  EventExecutor executor = ctx.executor();

完整的演示下用法:

public class MyChannelHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Received message: " + msg);

        // 将消息传递给下一个处理器
        ctx.fireChannelRead(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();

        // 关闭 Channel
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel is active");

        // 写入消息到  Channel 并刷新
        ctx.writeAndFlush("Welcome!");
    }
}

10. Attribute

Attribute 是一种用于在 Channel 上保存和访问自定义数据的机制。

比如一个连接建立后,要给当前连接绑定一个标识,比如这个连接是属于哪个用户、属于哪个设备等等。这个时候就可以给 Channel 设置一个 userId 或者 devId 标识。

用法很简单:

AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId ");

// 将用户ID =1062 绑定到当前 Channel 上
ctx.channel().attr(USER_ID_KEY).set("1062");

这样后续就可以直接判断这个连接所属的用户。也可以扩展到其他属性,比如连接时间、最近发送消息时间等等。

获取的时候就这样获取:

AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId ");

final String userId = ctx.channel().attr(USER_ID_KEY).get();

11. ByteBuf

ByteBuf 是 Netty 提供的一个字节操作容器 , 它提供了一组 API 用于操作字节。

主要是 Java NIO 的 ByteBuffer 用起来比较麻烦,而且比较容易出错。

ByteBuf 的实现有多种,包括堆内存实现、直接内存实现等等,实现比较多。

  • Heap ByteBuf 。 基于 JVM 堆内存的缓冲区。创建和销毁成本比较低,适合短生命周期的缓冲区。
  • Direct ByteBuf。 使用直接内存的缓冲区,所以避免了堆内存到直接内存的复制,适合频繁的 I/O 操作。
  • Composite ByteBuf。 组合几个 ByteBuf 实例,可以避免数据复制,将多个缓冲区组合在一起进行处理。

演示下常用的操作:

  • 创建 ByteBuf
// 通过 ByteBufAllocator 创建 ByteBuf

// 非池化的 ByteBuf
final ByteBuf buf = Unpooled.buffer(256); 

// 池化的 ByteBuf
final ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(256); 
  • 写数据
final ByteBuf buf = Unpooled.buffer(256);

// 写入一个 int
buf.writeInt(100); 

// 写入一个 byte 数组
buf.writeBytes(new byte[]{1, 2, 3, 4}); 
  • 读数据
// 读取一个 int
final int value = buf.readInt(); 
byte[] bytes = new byte[4];

// 读取一个 byte 数组
buf.readBytes(bytes); 
  • 获取容量和可读写字节数
 // 获取总容量
final int capacity = buf.capacity();

// 获取可读字节数
final int readableBytes = buf.readableBytes(); 

// 获取可写字节数
final int writableBytes = buf.writableBytes(); 
  • 切换读写指针
// 设置 readerIndex
buf.readerIndex(0); 

// 设置 writerIndex
buf.writerIndex(0); 

12. Codec

Codec 就是编解码,Netty 里面也提供一组接口以及类用来实现自定义的编解码逻辑。

编码就是把 Java 对象转成字节数据,这样才可以在网络上传输。编码一般会称为 Encoder , Netty中的接口是 MessageToByteEncoder 、MessageToMessageEncoder 这几个接口。

解码就是把字节数据转成 Java 对象,这样方便后续的业务逻辑处理。解码一般称为 Decoder,Netty 中的接口是 ByteToMessageDecoder 、MessageToMessageDecoder 接口。