Netty快速入门系列(三)-Netty基本概念与术语
Netty快速入门系列(三)-Netty基本概念与术语
动手实践了一个 HelloWorld 之后,就可以开始了解一些 Netty 比较核心的一些概念。主要是有下面这些:
- Bootstrap 。 用来启动服务端或者客户端的类。
- EventLoop、EventLoopGroup 。 处理连接、处理I/O事件等等。
- Channel。 Channel 代表一个网络连接。
- ChannelHandler。处理 I/O 事件。
- ChannelPipeline 。 包含了一系列的 ChannelHandler。
- ChannelFuture。 代表异步 I/O 操作的结果。
- ChannelOption。 用于配置 Channel 的参数。
- ChannelHandlerContext。 表示 ChannelHandler 和 ChannelPipeline 之间的上下文对象,可以用来获取 Channel、触发各种 I/O 事件等。
- Attribute。 用于存储 Channel 或 ChannelHandlerContext 的自定义属性。
- Codec。 编码、解码的接口。
- ByteBuf。用于操作字节数据的容器。
整个 Netty 中我觉得就这么些概念是比较重要的,当然一些包括网络通讯的基本概念不包含在这个里面。
要想熟练的运用 Netty , 上面这些概念必须得要搞懂。不过光看是看不懂的,一篇文章也不可能讲解的很细致。看只是为了先有个大概的概念,在实践中加深理解。
1. Bootstrap
Bootstrap 是 Netty 的一个启动辅助类 。它可以用一种简单的方式完成 Netty 程序的启动、初始化、配置参数等等。
主要作用:
- 设置 EventLoopGroup。
- 指定 Channel 类型。
- 设置 ChannelOption 。
- 配置 ChannelHandler。
- 客户端应用可以通过 Bootstrap.connect() 方法来连接到服务端。
- 服务端应用通过 ServerBootstra.bind() 方法来绑定到指定计算机端口。
所以 Netty 也是提供了两个 Bootstrap 类型:
- ServerBootstrap 用于启动服务端。
- Bootstrap 用于启动客户端。
这两个也是非常的好区分,服务端的名称前面增加 Server,客户端的则没有。
在配置 Channel 上还有一点小差异,比如 ServerBootstrap 配置的是 ServerSocketChannel 。 而 Bootstrap 配置的是 SocketChannel。可以看出来,这个也是在名字上做了区分。
对比下初始化方式,ServerBootstrap :
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
ServerBootStrap 用于服务端绑定一个本地端口,然后等待其他客户端的连接。所以会绑定两个 EventLoopGroup,通常称为 bossGroup 和 workerGroup。
Bootstrap :
final EventLoopGroup group = new NioEventLoopGroup();
try {
final Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyClientHandler());
}
});
ChannelFuture f = b.connect("localhost", 8080).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
Bootstrap 是客户端主动去连接其他服务端接口,所以只绑定一个 EventLoopGroup。
2. Channel
Netty 中的 Channel 就是代表一个网络连接或者说能够进行 I/O 操作的组件。 不过通常都是说一个建立好的网络连接是 Channel 。 这个 Channel 是一个抽象接口,实现类包括 NioSocketChannel、NioServerSocketChannel 等。
Channel 的几个特点:
- Channel 的所有 I/O 操作,如读、写、连接、绑定都是异步的,会返回一个
ChannelFuture
对象。 跟 JDK 的 Future 是类似的,在操作完成时的时候可以触发回调函数。 - Channel 是线程安全的,可以在多个线程中共享访问。
- Channel 生命周期包括打开、绑定、连接、断开和关闭等状态。
- Channel 可以保存自定义属性 Attribute,方便在 Channel 的整个生命周期中传递一些数据。
演示下 Channel 常用的几个方法:
final Bootstrap b = new Bootstrap();
// ... 省略配置
ChannelFuture f = b.connect("localhost", 8080).sync();
// 获取 Channel
Channel channel = f.channel();
// 设置当前连接的设备ID到 channel 中
final Attribute<String> attr = channel.attr(AttributeKey.valueOf("deviceId"));
attr.set("1062");
// 设置当前 Channel 绑定的的设备ID
final Attribute<String> attr = channel.attr(AttributeKey.valueOf("deviceId"));
final String devId = attr.get();
// 调用 channel.writeAndFlush() 发送数据给服务端
channel.writeAndFlush(Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8));
// 关闭连接
ChannelFuture closeFuture = channel.close();
closeFuture.addListener(future -> {
if (future.isSuccess()) {
System.out.println("Channel 关闭成功");
} else {
System.err.println("Close 关闭失败");
}
});
3. EventLoop、EventLoopGroup
EventLoop 也是 Netty 中很核心的一个概念。但是 EventLoop 并不是 Netty 特有的概念,而是一种设计模式。 简单点来说就是一种循环机制,用于等待和处理事件。
在一个典型的 EventLoop 中,会有一个线程不断循环的来轮询事件,处理事件,并再次等待新的事件。比如在编写原生的 JDK NIO 的时候,就会有下面的代码:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 打开 Selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待事件
selector.select();
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
// 处理客户端连接
} else if (key.isReadable()) {
// 处理读事件
}
}
}
这里的 while(true) 就是类似一个 EventLoop 的逻辑,不停地去查询 Channel 是否可读可写等。
Netty 中的 EventLoop 主要是一个单线程的循环,不断调用 Selector.select() 方法。 每个 EventLoop 负责处理一个或多个 Channel 的所有 I/O 事件和任务。
总结下 EventLoop 的主要职责:
- 监听并处理 I/O 事件,比如读、写、连接。
- 运行用户提交的一些普通任务以及定时任务。
- 管理 Channel 的注册、激活、注销和关闭等一些生命周期事件。
EventLoopGroup 看名字就可以猜到是一组 EventLoop 的集合,通常用于管理和分配 EventLoop。Netty 中的
EventLoopGroup 可以划分成 2 类,分别是:
- Boss EventLoopGroup 只负责处理网络连接的接入。
- Worker EventLoopGroup 负责处理已建立连接的 I/O 操作,通俗点来说就是处理业务逻辑等。
这个在之前的代码中多次出现:
// 处理网络连接的接入
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 处理连接的 I/O 事件
final EventLoopGroup workerGroup = new NioEventLoopGroup();
举个形象的例子理解这两个 Group。 bossGroup类似于餐厅大门的【迎宾】角色,负责将你引导进餐厅即可,不干别的事情。workerGroup 相当于【服务员】的角色,服务员会问你想吃什么、想点什么、后续上菜、最后结账等等。
4. ChannelHandler
ChannelHandler 是一个 Netty 中的接口,负责处理Channel中的各种事件和I/O操作,通常我们的业务逻辑代码就是写在 ChannelHandler 的实现里面。
6. ChannelPipeline
ChannelPipeline 是一个 Handler 链表,包含多个 ChannelHandler 实例,按照在处理链中的顺序来处理事件。跟数据结构中的双向链表原理是一样的。
7. ChannelFuture
ChannelFuture 其实就是 Java Future 接口的扩展,专门为 Netty 的 Channel 一些操作设计的。
所以跟 Java Future 的作用几乎也是一模一样,都是基于回调跟监听器的机制,提供回调函数在操作完成时进行触发。
主要方法:
- isDone() 检查操作是否完成。
- isSuccess() 检查操作是否成功完成。
- cause() 如果操作失败,返回失败的原因 Throwable。
- addListener() 添加 ChannelFutureListener,在操作完成时通知。
- sync() 等待操作完成,如果操作失败则抛出异常。
- await() 等待操作完成,但不抛出异常。
演示用法:
final ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", 8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("连接建立成功");
} else {
System.err.println("连接建立失败");
future.cause().printStackTrace();
}
}
});
8. ChannelOption
ChannelOption 是 Netty 中用于配置 Channel 的一个配置类,主要是用于设置 Channel 的各种属性。
下面是一些常见的设置,如果不确定具体作用最好不要乱改:
- ChannelOption.SO_RCVBUF。 调整接收缓冲区大小。
- ChannelOption.SO_SNDBUF。 调整发送缓冲区大小。
- ChannelOption.SO_KEEPALIVE。 是否启用或禁用 TCP 保持连接。
- ChannelOption.SO_REUSEADDR 。 是否允许重用本地地址和端口。
- ChannelOption.TCP_NODELAY。 是否开启 Nagle 算法,用于减少小数据包的发送。
- ChannelOption.CONNECT_TIMEOUT_MILLIS。 设置连接超时时间(毫秒)。
- ChannelOption.SO_BACKLOG 。 设置服务器接受连接的队列长度。
这些参数大部分都是原生 JDK Socket API 里面的一些设置参数,少部分是操作系统的配置参数。
例子:
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
9. ChannelHandlerContext
看名字中包含 Context,可以猜到跟上下文有关系。ChannelHandlerContext 就是提供跟 ChannelPipeline 和 Channel 交互的上下文接口。
用的比较多的方法是下面几个:
- 获取 Channel 。
// ChannelHandlerContext ctx
Channel channel = ctx.channel();
- 关闭 Channel。
ctx.close();
- 写入消息到 Channel。
// ChannelHandlerContext ctx
ctx.write(msg);
ctx.writeAndFlush(msg);
- 将读操作传递给下一个 ChannelInboundHandler。
// ChannelHandlerContext ctx
ctx.fireChannelRead(msg);
- 获取 Channel ,然后调用Channel 的 writeAndFlush(msg) 方法写入数据。
// ChannelHandlerContext ctx
ctx.channel().writeAndFlush(msg)
- 获取 ChannelPipeline ,然后添加 Handler 或者移除 Handler。
// ChannelHandlerContext ctx
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast(new MyHandler())
- 获取 EventExecutor ,然后用 EventExecutor 执行耗时的操作。
// ChannelHandlerContext ctx
EventExecutor executor = ctx.executor();
完整的演示下用法:
public class MyChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Received message: " + msg);
// 将消息传递给下一个处理器
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 关闭 Channel
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel is active");
// 写入消息到 Channel 并刷新
ctx.writeAndFlush("Welcome!");
}
}
10. Attribute
Attribute 是一种用于在 Channel 上保存和访问自定义数据的机制。
比如一个连接建立后,要给当前连接绑定一个标识,比如这个连接是属于哪个用户、属于哪个设备等等。这个时候就可以给 Channel 设置一个 userId 或者 devId 标识。
用法很简单:
AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId ");
// 将用户ID =1062 绑定到当前 Channel 上
ctx.channel().attr(USER_ID_KEY).set("1062");
这样后续就可以直接判断这个连接所属的用户。也可以扩展到其他属性,比如连接时间、最近发送消息时间等等。
获取的时候就这样获取:
AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId ");
final String userId = ctx.channel().attr(USER_ID_KEY).get();
11. ByteBuf
ByteBuf 是 Netty 提供的一个字节操作容器 , 它提供了一组 API 用于操作字节。
主要是 Java NIO 的 ByteBuffer 用起来比较麻烦,而且比较容易出错。
ByteBuf 的实现有多种,包括堆内存实现、直接内存实现等等,实现比较多。
- Heap ByteBuf 。 基于 JVM 堆内存的缓冲区。创建和销毁成本比较低,适合短生命周期的缓冲区。
- Direct ByteBuf。 使用直接内存的缓冲区,所以避免了堆内存到直接内存的复制,适合频繁的 I/O 操作。
- Composite ByteBuf。 组合几个 ByteBuf 实例,可以避免数据复制,将多个缓冲区组合在一起进行处理。
演示下常用的操作:
- 创建 ByteBuf
// 通过 ByteBufAllocator 创建 ByteBuf
// 非池化的 ByteBuf
final ByteBuf buf = Unpooled.buffer(256);
// 池化的 ByteBuf
final ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(256);
- 写数据
final ByteBuf buf = Unpooled.buffer(256);
// 写入一个 int
buf.writeInt(100);
// 写入一个 byte 数组
buf.writeBytes(new byte[]{1, 2, 3, 4});
- 读数据
// 读取一个 int
final int value = buf.readInt();
byte[] bytes = new byte[4];
// 读取一个 byte 数组
buf.readBytes(bytes);
- 获取容量和可读写字节数
// 获取总容量
final int capacity = buf.capacity();
// 获取可读字节数
final int readableBytes = buf.readableBytes();
// 获取可写字节数
final int writableBytes = buf.writableBytes();
- 切换读写指针
// 设置 readerIndex
buf.readerIndex(0);
// 设置 writerIndex
buf.writerIndex(0);
12. Codec
Codec 就是编解码,Netty 里面也提供一组接口以及类用来实现自定义的编解码逻辑。
编码就是把 Java 对象转成字节数据,这样才可以在网络上传输。编码一般会称为 Encoder , Netty中的接口是 MessageToByteEncoder 、MessageToMessageEncoder 这几个接口。
解码就是把字节数据转成 Java 对象,这样方便后续的业务逻辑处理。解码一般称为 Decoder,Netty 中的接口是 ByteToMessageDecoder 、MessageToMessageDecoder 接口。