Netty快速入门系列(五)-理解协议编解码Codec
Netty快速入门系列(五)-理解协议编解码Codec
在网络过程中,数据的传输跟接收都是按字节流的形式进行。 但是我们写代码的话,通常会希望直接跟编程语言里面的对象打交道。 协议编解码就是这两种形式之间的一个转换。只有理解了协议编解码,才能去尝试编写生产级别的 Netty 应用。
1. 什么是协议编解码
编码通常就是把 Java 对象转成字节数据,这样才可以在网络上传输。
解码通常就是把字节数据转成 Java 对象,这样方便后续的业务逻辑处理。
2. 为什么需要进行编解码
总结下有以下几点原因:
- 网络传输中只能传递字节流,但是应用程序通常处理的是编程语言数据对象。
- 通过编解码可以减少数据大小,可以提升网络通信效率。
- 保证通信双方按照相同的数据格式去通信,避免数据传输产生歧义。
3. Netty 中的编解码器
3.1 编码器(Encoder)
编码器就是负责将发给对端的 Java 对象转换为字节或其他消息对象。在Netty中,编码器通常会继承下面 2 个类:
- MessageToByteEncoder: 将 Java 对象编码为 ByteBuf。
- MessageToMessageEncoder: 将一种 Java 对象编码为另一种 Java 对象。
这两个类其实看名字应该就比较好理解。Message 可以理解为是 Java中的对象,Byte就是字节数据。这样组合起来就可以根据类名推测编码器的意思
3.1.1 MessageToByteEncoder
先定义一个消息实体类,这个消息实体类里面目前就一个字段 msg ,代表发送给对端的消息字符串:
public class MessageRequest {
private String msg;
public MyMessage(String msg) {
this.msg = msg;
}
// set get
}
假设协议报文格式是:【4个字节消息长度】【任意字节大小的消息内容】
public class MyMessageToByteEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageRequest messageRequest, ByteBuf out) throws Exception {
final String msg = messageRequest.getMsg();
// 将字符串消息转换为 byte 数组
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
// 第一步, 报文先先写一个 int 类型的消息长度
out.writeInt(bytes.length);
// 第二部,写入消息内容。
out.writeBytes(bytes);
}
}
在 ChannelInitializer 中配置这个编码器 :
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加 MessageRequestEncoder
pipeline.addLast(new MyMessageToByteEncoder());
// 其他处理器...
}
}
然后业务逻辑中对 channel 发送消息即可。
final MessageRequest request = new MessageRequest("Hello, World!");
channel.writeAndFlush(request);
当执行上面代码时,会发生下面的流程:
- MyMessageToByteEncoder接收 EnhancedMessageRequest,将其编码为 ByteBuf。
- 最终的 ByteBuf 被写入到底层的 Channel 中,对端就会收到这个数据。
3.1.2 MessageToMessageEncoder
MessageToMessageEncoder 是将 Java 对象转换为 Java 对象,所以通常会跟 MessageToByteEncoder 结合进行使用。 单独使用其实是没有很大价值的,因为最终数据都是要通过字节流的形式进行传输。
再看一个 MessageToMessageEncoder 编码器的例子。假设上面的 MessageRequest 对象还缺少了一个消息ID,那么可以写一个编码器进行转化一下,统一给 MessageRequest 对象添加消息ID。
新建一个 EnhancedMessage 对象:
public class EnhancedMessageRequest {
private String msgId;
private String msg;
public EnhancedMessageRequest(String msg, String msgId) {
this.msg = msg;
this.msgId = msgId;
}
// set get
}
新建一个 MessageRequestToEnhancedMessageRequestEncoder 将 MessageRequest 对象转换为 EnhancedMessageRequest 对象:
public class MessageRequestToEnhancedMessageRequestEncoder extends MessageToMessageEncoder<MessageRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageRequest messageRequest, List<Object> out) throws Exception {
final String msg = messageRequest.getMsg();
final EnhancedMessageRequest enhancedMessage = new EnhancedMessageRequest(
UUID.randomUUID().toString(), // 生成一个唯一的消息ID
msg // 使用原始消息内容
);
// 把 EnhancedMessage 添加到输出列表
out.add(enhancedMessage);
}
}
再新建一个 EnhancedMessageRequestToByteEncoder 将 EnhancedMessageRequest 转换为 ByteBuf:
public class EnhancedMessageRequestToByteEncoder extends MessageToByteEncoder<EnhancedMessageRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, EnhancedMessageRequest request, ByteBuf out) throws Exception {
// 将消息 ID 转换为 byte 数组
final String msgId = request.getMsgId();
byte[] msgIdBytes = msgId.getBytes(StandardCharsets.UTF_8);
// 先写入消息ID长度以及数据
out.writeInt(msgIdBytes.length);
out.writeBytes(msgIdBytes);
// 将消息转换为 byte 数组
final String msg = request.getMsg();
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
// 先写入消息长度以及数据
out.writeInt(msgBytes.length);
out.writeBytes(msgBytes);
}
}
在 ChannelInitializer 中组合这两个编码器:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// MessageRequest >>> EnhancedMessageRequest
pipeline.addLast(new MessageRequestToEnhancedMessageRequestEncoder());
// EnhancedMessageRequest >>> ByteBuf
pipeline.addLast(new EnhancedMessageRequestToByteEncoder());
// 其他处理器...
}
}
代码中使用:
final MessageRequest request = new MessageRequest("Hello, World!");
channel.writeAndFlush(request);
当执行上面代码时,会发生下面的流程:
- MessageRequestToEnhancedMessageRequestEncoder 接收到 MessageRequest ,将其转换为 EnhancedMessageRequest。
- EnhancedMessageRequestToByteEncoder 接收 EnhancedMessageRequest,将其编码为 ByteBuf。
- 最终的 ByteBuf 被写入到底层的 Channel 中,对端就会收到这个数据。
3.2 解码器(Decoder)
解码器就是负责将对端发过来的字节或消息对象转换为应用可以处理的消息对象。在Netty中,解码器通常继承自以下类:
- ByteToMessageDecoder:将 ByteBuf 解码为 Java 对象。
- MessageToMessageDecoder:将一种 Java 对象解码为另一种 Java 对象。
下面演示的例子就是将上面编码器发送的数据进行解码,一些对象结构也是一模一样的。
3.2.1 ByteToMessageDecoder
以上面的协议协议报文格式是【4个字节消息长度】【任意字节大小的消息内容】进行解码。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class MyByteToMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 确保有足够的字节可读
if (in.readableBytes() < 4) {
// 需要更多数据,这里说明数据没有读完
return;
}
// 标记当前读取位置
in.markReaderIndex();
// 读取消息长度, 第一个 int 即消息长度
final int length = in.readInt();
// 检查是否有足够的字节可读
if (in.readableBytes() < length) {
// 需要更多数据,这里说明数据没有读完
in.resetReaderIndex();
return;
}
// 读取消息内容
final byte[] bytes = new byte[length];
in.readBytes(bytes);
// 将字节数组转换为字符串
final String msg = new String(bytes, StandardCharsets.UTF_8);
final MessageRequest messageRequest = new MessageRequest(msg);
// 将解码后的对象添加到输出列表
out.add(messageRequest);
}
}
3.2.2 MessageToMessageDecoder
MessageToMessageDecoder 是将 Java 对象转换为 Java 对象,所以通常会跟 ByteToMessageDecoder 结合进行使用。
新建一个 ByteToEnhancedMessageRequestDecoder 将字节数据转换成 EnhancedMessageRequest 这个对象:
public class ByteToEnhancedMessageRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 确保有足够的字节可读取消息ID长度,不够重新读取
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
// 前4个字节是消息ID长度
final int msgIdLength = in.readInt();
// 检查是否有足够的字节可读取消息ID,不够重新读取
if (in.readableBytes() < msgIdLength) {
in.resetReaderIndex();
return;
}
// 读取消息ID
final byte[] msgIdBytes = new byte[msgIdLength];
in.readBytes(msgIdBytes);
final String msgId = new String(msgIdBytes, StandardCharsets.UTF_8);
// 确保有足够的字节可读取消息内容长度
if (in.readableBytes() < 4) {
in.resetReaderIndex();
return;
}
final int msgLength = in.readInt();
// 检查是否有足够的字节可读取
if (in.readableBytes() < msgLength) {
in.resetReaderIndex();
return;
}
// 读取消息内容,然后创建 EnhancedMessageRequest 对象并添加到输出列表
final byte[] msgBytes = new byte[msgLength];
in.readBytes(msgBytes);
final String msg = new String(msgBytes, StandardCharsets.UTF_8);
final EnhancedMessageRequest enhancedMessage = new EnhancedMessageRequest(msg, msgId);
out.add(enhancedMessage);
}
}
新建 EnhancedMessageRequestToMessageRequestDecoder 解码器将 EnhancedMessageRequest 对象转换为 MessageRequest 对象:
public class EnhancedMessageRequestToMessageRequestDecoder extends MessageToMessageDecoder<EnhancedMessageRequest> {
@Override
protected void decode(ChannelHandlerContext ctx, EnhancedMessageRequest enhancedMessage, List<Object> out) throws Exception {
final String msg = enhancedMessage.getMsg();
final MessageRequest messageRequest = new MessageRequest(msg);
// 将 MessageRequest 对象添加到输出列表
out.add(messageRequest);
}
}
在 ChannelInitializer 中组合这两个编码器:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 下面的顺序很重要
// 编码器,这是上一个章节的内容
// pipeline.addLast(new MessageRequestToEnhancedMessageRequestEncoder());
// pipeline.addLast(new EnhancedMessageRequestToByteEncoder());
// 添加解码器
pipeline.addLast(new ByteToEnhancedMessageRequestDecoder());
pipeline.addLast(new EnhancedMessageRequestToMessageRequestDecoder());
// 添加其他的编解码器
// pipeline.addLast(new MyBusinessHandler());
}
}
执行流程如下:
3.3 编解码器(Codec)
编解码器(Codec)就是编码器和解码器的组合,它可以同时处理入站和出站数据。跟上面的编码器、解码器没什么不一样,只是结合起来了。
优势在于:
- 将编码和解码逻辑合并在一个类中,可以减少重复代码。
- 一个类中更容易保证编码和解码逻辑的一致性,减少错误。
Netty 中主要继承下面 2 个类来实现编解码器。
3.3.1 ByteToMessageCodec
ByteToMessageCodec 就是 结合了 ByteToMessageDecoder 和 MessageToByteEncoder 的功能。
在请求入站的时候,将字节数据转换为消息对象。 在请求出站的时候,将消息对象转换为字节数据。
上个简单的例子:
public class MyByteToEnhancedMessageRequestCodec extends ByteToMessageCodec<EnhancedMessageRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, EnhancedMessageRequest request, ByteBuf out) throws Exception {
// 将 EnhancedMessageRequest 对象编码为字节,写入ByteBuf
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 从 ByteBuf 读取字节,解码为 EnhancedMessageRequest 对象
}
}
3.3.2 MessageToMessageCodec
MessageToMessageCodec 就是结合了 MessageToMessageDecoder 和 MessageToMessageEncoder 的功能。
请求入站的时候将一种消息类型转换为另一种消息类型。 在请求出站的时候,将转换后的消息类型再转换回原始类型。
public class MyMessageToMessageCodec extends MessageToMessageCodec<EnhancedMessageRequest, MessageRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageRequest msg, List<Object> out) throws Exception {
// 将 MessageRequest 转换为 EnhancedMessageRequest
}
@Override
protected void decode(ChannelHandlerContext ctx, EnhancedMessageRequest msg, List<Object> out) throws Exception {
// 将 EnhancedMessageRequest 转换为 MessageRequest
}
}