手写RPC框架系列(八) - 编写 RPC 网络通信
手写RPC框架系列(八) - 编写 RPC 网络通信
这个章节将开始编写 RPC 网络通信部分,稍微会有点小难度,并会涉及到一些新的概念。
本章节的内容大概如下:
- 编写基本的 Netty 服务端以及客户端代码
- 设计 RPC 通信协议 (包括协议头以及协议体)
- 实现消息编码解码器
- 创建业务处理器以处理 RPC 请求
- 将编解码器以及业务处理器加入 Netty 的 ChannelPipeLine
- 理解心跳机制以及在 RPC 中的应用
1. 通信协议的设计
在计算机体系中,存在着很多的网络通信协议。 什么是通信协议呢,简单的来说就是一段通信双方事先约定好的按照规定的格式去编码以及解码的数据。 通过约定好的结构,最终达到传输数据的目的。在前面章节中,我们选择使用 TCP 协议进行数据传输,那么 TCP 协议必然也有自己的协议报文格式。
下图中为 TCP 协议报文的格式:
(截图来自维基)
在这次 RPC 通信中,我们的协议设计为如下:
+----------+----------------+-------------+------------+------------------+
| 魔数 | 序列化算法 | 指令类型 | 长度 | 数据 |
| (1 byte) | (1 byte) | (1 byte) | (4 bytes) | (变长) |
+----------+----------------+-------------+------------+------------------+
协议报文结构
- 魔数 (1 字节)
- 这是协议报文的第一个字节,作为标识符用来确认接收的数据包是否是预期的格式或来自预期的发送者。
- 序列化算法 (1 字节)
- 这一字节表示使用的序列化算法。在这段代码中,
type
这个字段告诉接收端应该使用哪种解码器来解码后续的数据。
- 这一字节表示使用的序列化算法。在这段代码中,
- 指令类型 (1 字节)
- 这一字节用来定义数据包的类型或者说命令类型。它帮助理解如何处理这个数据包。
- 长度 (4 字节)
- 这四个字节代表后续数据部分的长度,即数据的字节数。这是一个整数值,告诉接收端在后续读取数据时应该读取多少字节。
- 数据 (变长)
- 这部分包含实际的数据,其长度由上一部分的长度字段指定。
2. 抽象通信协议到实体类
2.1 抽象 Packet
package com.suny.rpc.nettyrpc.core.model.packet;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Setter;
import java.io.Serializable;
/**
* 抽象消息体.
*/
@Data
public abstract class Packet implements Serializable {
private static final long serialVersionUID = 5038767224481675128L;
/**
* 魔数. 标识协议
*/
@Setter(AccessLevel.NONE)
private byte magicNumber = 66;
/**
* 获取消息类型
*
* @return 消息类型
*/
public abstract PacketType getPacketType();
}
2.2 定义 Packet 类型枚举
package com.suny.rpc.nettyrpc.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum PacketType {
/**
* 心跳包
*/
HEART_BEAT((byte) 0),
/**
* 请求包
*/
RPC_REQUEST((byte) 1),
/**
* 应答包
*/
RPC_RESPONSE((byte) 2);
private byte type;
}
2.3 定义 RPC 请求 packet
package com.suny.rpc.nettyrpc.core.model;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Setter
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
public class RpcRequest extends Packet implements Serializable {
private static final long serialVersionUID = 6290632141408825905L;
/**
* 请求流水号
*/
private String sequence;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型. 有序
*/
private Class<?>[] parameterType;
/**
* 参数. 有序
*/
private Object[] parameters;
@Override
public PacketType getPacketType() {
return PacketType.RPC_REQUEST;
}
}
2.4 定义 RPC 应答 packet
package com.suny.rpc.nettyrpc.core.model;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
@Setter
public class RpcResponse extends Packet implements Serializable {
private static final long serialVersionUID = 2964937493568971128L;
/**
* 流水号
*/
private String sequence;
/**
* 异常信息
*/
private Throwable throwable;
/**
* 结果
*/
private Object result;
public RpcResponse(String sequence, Throwable throwable) {
this.sequence = sequence;
this.throwable = throwable;
}
public RpcResponse(String sequence, Object result) {
this.sequence = sequence;
this.result = result;
}
/**
* 只提供给反序列化使用
*/
@Deprecated
public RpcResponse() {
}
@Override
public PacketType getPacketType() {
return PacketType.RPC_RESPONSE;
}
}
2.5 定义 packet 管理类
package com.suny.rpc.nettyrpc.core.codec;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import java.util.HashMap;
import java.util.Map;
public class PacketClassManager {
private static final Map<Byte, Class<? extends Packet>> PACKET_CLASS_MAP = new HashMap<>();
static {
PACKET_CLASS_MAP.put(PacketType.HEART_BEAT.getType(), HeartBeatPacket.class);
PACKET_CLASS_MAP.put(PacketType.RPC_REQUEST.getType(), RpcRequest.class);
PACKET_CLASS_MAP.put(PacketType.RPC_RESPONSE.getType(), RpcResponse.class);
}
public static Class<? extends Packet> getPacketClass(byte packetType) {
return PACKET_CLASS_MAP.get(packetType);
}
}
3. 通信协议的编解码
简单的来说,通信协议的编解码就是:
- 编码: 将 Java 对象编码成字节流以便在网络上传输。
样例:
7B 22 73 65 71 75 65 6E 63 65 22 3A 22 30 30 31 22 2C 22 63 6C 61 73 73 4E 61 6D 65 22 3A 22 63 6F 6D 2E 65 78 61 6D 70 6C 65 2E 55 73 65 72 53 65 72 76 69 63 65 22 2C 22 6D 65 74 68 6F 64 4E 61 6D 65 22 3A 22 66 69 6E 64 55 73 65 72 42 79 49 64 22 2C 22 70 61 72 61 6D 65 74 65 72 54 79 70 65 22 3A 5B 22 6A 61 76 61 2E 6C 61 6E 67 2E 53 74 72 69 6E 67 22 5D 2C 22 70 61 72 61 6D 65 74 65 72 73 22 3A 5B 22 31 32 33 34 35 22 5D 2C 22 6D 61 67 69 63 4E 75 6D 62 65 72 22 3A 36 36 7D
- 解码: 将网络上的字节流解码成 Java 对象。
样例:
{
"sequence": "b272d6e4-92c5-4390-be15-2a5641046c97",
"className": "com.suny.rpc.nettyrpc.server.biz.UserService",
"methodName": "selectById",
"parameterType": ["java.lang.String"],
"parameters": ["10001"],
"magicNumber": 66
}
3.1 抽象序列化协议相关接口
编写具体的编解码实现前,我们要先抽象出序列化算法的接口来,给 RPC 通信切换协议带来可能。
3.1.1 抽象序列化接口
package com.suny.rpc.nettyrpc.core.serialize;
public interface Serializer {
/**
* 序列化对象
*
* @param obj 待序列化对象
* @return 字节数组
*/
byte[] serialize(Object obj);
/**
* 反序列化
*
* @param bytes 字节数组
* @param clazz 目标类型
* @param <T> 泛型
* @return 反序列化对象
*/
<T> T deSerialize(byte[] bytes, Class<T> clazz);
/**
* 获取序列化、反序列化类型
*
* @return 序列化类型
*/
SerializerType getSerializerType();
}
3.1.2 序列化协议枚举定义
package com.suny.rpc.nettyrpc.core.serialize;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum SerializerType {
/**
* json类型
*/
JSON((byte) 1);
private byte type;
}
3.1.3 JSON序列化实现
package com.suny.rpc.nettyrpc.core.serialize;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.springframework.stereotype.Service;
@Service
public class SimpleJsonSerializerImpl implements Serializer {
static{
ParserConfig.getGlobalInstance().addAccept("com.suny.rpc");
}
@Override
public byte[] serialize(Object obj) {
return JSON.toJSONBytes(obj, SerializerFeature.WriteClassName);
}
@Override
public <T> T deSerialize(byte[] bytes, Class<T> clazz) {
return JSON.parseObject(bytes, clazz);
}
@Override
public SerializerType getSerializerType() {
return SerializerType.JSON;
}
}
3.1.4 序列化协议工厂类
package com.suny.rpc.nettyrpc.core.serialize;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 序列化工厂
*/
@Component
public class SerializerFactory {
private final List<Serializer> serializerList;
public SerializerFactory(List<Serializer> serializerList) {
this.serializerList = serializerList;
}
public Serializer getSerializer(byte type) {
for (Serializer serializer : serializerList) {
final SerializerType serializerType = serializer.getSerializerType();
if (serializer.getSerializerType().getType() == type) {
return serializer;
}
}
return null;
}
}
3.2 编码
编码的主要流程如下:
- 确定序列化类型: 我们暂时就固定了序列化的类型是 JSON,所有的
Packet
对象都将使用 JSON 格式进行序列化。 - 编码协议头数据:
- 魔数:用于标识数据包的来源和格式,确保数据包没有被错误地解析。
- 序列化算法:标识了用于序列化和反序列化数据的算法,使得接收端知道如何处理后续的字节数据。
- 指令类型:用于描述数据包的具体用途或行为,指导接收端如何处理这个数据包。
- 序列化数据:
- 使用所选的序列化方式(JSON)将
Packet
对象转换为字节数据。
- 使用所选的序列化方式(JSON)将
- 写入数据长度和数据本身:
- 首先写入序列化后的数据长度,这是为了让接收端知道应该读取多少字节。
- 然后写入实际的序列化数据。
package com.suny.rpc.nettyrpc.core.codec;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.serialize.SerializerFactory;
import com.suny.rpc.nettyrpc.core.serialize.SerializerType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageEncoder extends MessageToByteEncoder<Packet> {
private final SerializerFactory serializerFactory;
public MessageEncoder(SerializerFactory serializerFactory) {
this.serializerFactory = serializerFactory;
}
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
final byte type = SerializerType.JSON.getType();
// 魔数
out.writeByte(msg.getMagicNumber());
// 序列化算法
out.writeByte(type);
// 指令类型
out.writeByte(msg.getPacketType().getType());
// 长度
final byte[] bytes = serializerFactory.getSerializer(type).serialize(msg);
out.writeInt(bytes.length);
// 数据
out.writeBytes(bytes);
}
}
3.3 解码
解码的主要流程如下:
- 跳过魔数:魔数是用来在接收数据时确认数据包的有效性和来源的。我们暂时不关心这个字段,跳过即可。
- 读取序列化算法:确定哪种序列化算法被用于数据包的编码,这是必须的步骤,因为解码器需要使用相应的算法将数据包的内容从字节转换回对象。
- 读取消息类型:为了识别识别数据包的具体类型(例如 RPC请求、RPC应答、心跳等)。这一信息是用于将字节数据反序列化为正确的
Packet
类型的对象。 - 读取消息长度:读取接下来的数据段长度,以便精确地从数据流中提取正确数量的字节数据。这是为了确保消息边界的正确处理,并避免解码错误。
- 反序列化数据: 将读取的字节数据转换回原始的数据结构(
Packet
对象)。- 首先,根据序列化类型获取对应的序列化器(
Serializer
)。 - 如果找不到合适的序列化器,抛出异常。
- 使用序列化器将字节数据转换为具体的
Packet
对象。
- 首先,根据序列化类型获取对应的序列化器(
- 输出解码对象:将解码后的 Java 对象传递给处理链中的下一个处理器。将解码后的
Packet
添加到out
列表中,Netty 将会自动处理这个列表中的对象,传递给下一个 ChannelHandler 处理。
package com.suny.rpc.nettyrpc.core.codec;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.serialize.Serializer;
import com.suny.rpc.nettyrpc.core.serialize.SerializerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class MessageDecoder extends ByteToMessageDecoder {
private final SerializerFactory serializerFactory;
public MessageDecoder(SerializerFactory serializerFactory) {
this.serializerFactory = serializerFactory;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 魔数
in.skipBytes(1);
// 序列化算法
final byte serializeType = in.readByte();
// 消息类型
final byte messageType = in.readByte();
final Class<? extends Packet> packetClass = PacketClassManager.getPacketClass(messageType);
if (packetClass == null) {
throw new UnsupportedOperationException("不支持" + messageType + "消息类型");
}
// 长度
final int messageLength = in.readInt();
// 反序列化数据
final Serializer serializer = serializerFactory.getSerializer(serializeType);
if (serializer == null) {
throw new UnsupportedOperationException("不支持" + serializeType + "序列化算法");
}
final byte[] bytes = new byte[messageLength];
in.readBytes(bytes);
final Packet packet = serializer.deSerialize(bytes, packetClass);
out.add(packet);
}
}
3. 编写 Netty 服务端代码
首先,我们需要创建一个 Netty 服务端来监听客户端的连接请求。在 rpc-core
模块中新建包 com.suny.rpc.nettyrpc.core.server
用于存放服务端相关的代码。
3.1 Netty 服务维护类
package com.suny.rpc.nettyrpc.core.server;
import com.suny.rpc.nettyrpc.core.codec.MessageDecoder;
import com.suny.rpc.nettyrpc.core.codec.MessageEncoder;
import com.suny.rpc.nettyrpc.core.serialize.SerializerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyServer {
public NettyServer(int nettyPort, SerializerFactory serializerFactory) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() * 2);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MessageEncoder(serializerFactory));
pipeline.addLast(new MessageDecoder(serializerFactory));
pipeline.addLast(new RpcServerSimpleChannelInboundHandlerImpl());
}
});
final ChannelFuture future;
try {
future = bootstrap.bind(nettyPort).sync();
log.info("Netty 服务启动完成. {}", nettyPort);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty 启动失败", e);
} finally {
log.info("Netty 服务开始关闭");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
defaultEventExecutorGroup.shutdownGracefully();
}
}
}
3.2 初始化服务端 RPC 业务处理
在 com.suny.rpc.nettyrpc.core.server
包中新建类 RpcServerSimpleChannelInboundHandlerImpl
,文件内容如下:
package com.suny.rpc.nettyrpc.core.server;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务端收到请求 {}.", pmsg);
}
}
}
4. 编写 Netty 客户端代码
接下来,我们需要编写客户端代码,用于发送RPC请求到服务端,这里我们新建一个包 com.suny.rpc.nettyrpc.core.client
用于维护跟客户端相关的类。
4.1 Netty 客户端维护类
package com.suny.rpc.nettyrpc.core.client;
import com.suny.rpc.nettyrpc.core.codec.MessageDecoder;
import com.suny.rpc.nettyrpc.core.codec.MessageEncoder;
import com.suny.rpc.nettyrpc.core.network.ChannelManager;
import com.suny.rpc.nettyrpc.core.serialize.SerializerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyClient {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
public NettyClient(SerializerFactory serializerFactory) {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MessageEncoder(serializerFactory));
pipeline.addLast(new MessageDecoder(serializerFactory));
pipeline.addLast(new RpcClientChannelInboundHandlerImpl());
}
});
ChannelManager.setBootstrap(bootstrap);
}
}
4.2 初始化客户端 RPC 业务处理
在 com.suny.rpc.nettyrpc.core.client
包中新建类 RpcClientChannelInboundHandlerImpl
,文件内容如下:
package com.suny.rpc.nettyrpc.core.client;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcClientChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Packet> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
log.info("客户端收到消息: [{}]", msg.toString());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端处理错误: ", cause);
ctx.close();
}
}
5. 总结
这个章节我们基于 Netty 实现了基本的 RPC 通信逻辑,目前是缺少对 RPC 请求的一个实际处理,下个章节我们将完善实际的请求处理部分。