手写RPC框架系列(九) - RPC请求处理以及应答
手写RPC框架系列(九) - RPC请求处理以及应答
在上一章节中,我们详细介绍了如何使用 Netty 实现 RPC 框架的网络通信层,包括服务端和客户端的实现、自定义的通信协议,以及消息的编码和解码。本章将继续探讨 RPC 请求的处理流程以及如何将处理结果返回给调用方。
本章内容大致如下:
- RPC请求以及应答的实体类抽象。
- 客户端怎么去发送请求以及维护请求的维护。
- 服务端怎么去接收请求以及处理本地方法调用。
- 服务端怎么给客户端应答请求结果。
- 客户端怎么接收请求结果。
1. RPC 请求以及 RPC 响应实体抽象
在实现具体的 RPC 请求处理以及应答前,我们需要定义表示 RPC 请求以及应答的结构。 这些实体类将在客户端以及服务端之前传输,携带方法调用的相关信息。
1.1 请求参数 RpcRequest
package com.suny.rpc.nettyrpc.core.model;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Setter
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
public class RpcRequest extends Packet implements Serializable {
private static final long serialVersionUID = 6290632141408825905L;
/**
* 请求流水号
*/
private String sequence;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型. 有序
*/
private Class<?>[] parameterType;
/**
* 参数. 有序
*/
private Object[] parameters;
@Override
public PacketType getPacketType() {
return PacketType.RPC_REQUEST;
}
}
1.2 响应参数 RpcResponse
package com.suny.rpc.nettyrpc.core.model;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
@Setter
public class RpcResponse extends Packet implements Serializable {
private static final long serialVersionUID = 2964937493568971128L;
/**
* 流水号
*/
private String sequence;
/**
* 异常信息
*/
private Throwable throwable;
/**
* 结果
*/
private Object result;
public RpcResponse(String sequence, Throwable throwable) {
this.sequence = sequence;
this.throwable = throwable;
}
public RpcResponse(String sequence, Object result) {
this.sequence = sequence;
this.result = result;
}
/**
* 只提供给反序列化使用
*/
@Deprecated
public RpcResponse() {
}
@Override
public PacketType getPacketType() {
return PacketType.RPC_RESPONSE;
}
}
2. 客户端发送 RPC 请求
在《手写RPC框架系列(三) - 在RPC框架中实现代理模式》章节中,我们的代理实现如下:
package com.suny.rpc.nettyrpc.core.client;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.network.RpcRequestSender;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* 客户端代理
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// todo 发送请求给远程服务接口,然后获取应答结果
}
}
在 invoke()
方法中,我们暂时还没有写具体的逻辑,这里的逻辑也比较简单,就是以下几个流程:
- 组装 RpcRequest 请求。
- 通过 Netty 发送给服务端。
- 接收 Netty 返回的调用结果。
- 拿到 Netty 返回的调用结果中的数据作为
invoke()
方法的返回值。
2.1 完善客户端代理类逻辑
根据上面的逻辑,我们先初步补充 RpcClientProxy
类代码如下:
public class RpcClientProxy implements InvocationHandler {
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setSequence(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterType(method.getParameterTypes());
rpcRequest.setParameters(args);
final RpcResponse rpcResponse = sendRpcRequest(rpcRequest);
return rpcResponse.getResult();
}
}
2.2 请求发送处理
这里的 sendRpcRequest()
方法暂时还没有编写,这里面有一定的业务逻辑,需要结合注册中心等进行请求发起,所以我们抽象抽取一个 RpcRequestSender
接口来处理请求发送:
package com.suny.rpc.nettyrpc.core.network;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
public interface RpcRequestSender {
/**
* 发送 rpc 请求
*
* @param rpcRequest 请求参数
* @return 请求结果
*/
RpcResponse sendRpcRequest(RpcRequest rpcRequest);
}
给定一个默认的实现,大致逻辑如下:
- 初始化异步结果容器:使用
CompletableFuture<RpcResponse>
来承载异步操作的结果。 - 拉取目标服务的调用信息:
- 从 RPC 请求中获取服务类名。
- 调用
rpcServiceDiscovery.getServiceInstance
以获取服务的实例地址(IP和端口)。
- 检查服务实例:如果服务实例为空或格式不正确,则抛出异常表示没有找到服务提供者。
- 获取网络通道:
- 根据解析的 IP 地址和端口号,从
ChannelManager
获取对应的Channel
。 - 检查这个
Channel
是否为空或不处于活动状态,如果是,则抛出异常。
- 根据解析的 IP 地址和端口号,从
- 发送 RPC 请求:
- 将 RPC 请求通过
channel.writeAndFlush()
方法发送。 - 添加监听器,以处理发送成功或失败的事件:
- 成功:记录日志信息。
- 失败:关闭通道,标记
future
为异常完成状态,并记录错误日志。
- 将 RPC 请求通过
- 等待并返回响应:
- 使用
future.get()
阻塞当前线程,直到异步操作完成并返回 RPC 响应结果或发生异常。
- 使用
代码如下:
package com.suny.rpc.nettyrpc.core.network;
import com.suny.rpc.nettyrpc.core.client.RequestFutureManager;
import com.suny.rpc.nettyrpc.core.discovery.RpcServiceDiscovery;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
public class NettyRpcRequestSenderImpl implements RpcRequestSender {
@Autowired
private RpcServiceDiscovery rpcServiceDiscovery;
@SneakyThrows
@Override
public RpcResponse sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
// 查找地址
final String className = rpcRequest.getClassName();
final String serviceInstance = rpcServiceDiscovery.getServiceInstance(className);
if (StringUtils.isBlank(serviceInstance)) {
throw new RuntimeException(className + "暂无服务提供者");
}
String[] split = serviceInstance.split(":");
// 查找 channel
final Channel channel = ChannelManager.get(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
if (channel == null || !channel.isActive()) {
throw new IllegalStateException();
}
RequestFutureManager.addFuture(rpcRequest.getSequence(), future);
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
log.info("客户端消息发送成功! [{}]", rpcRequest);
} else {
f.channel().close();
future.completeExceptionally(f.cause());
log.error("客户端发送消息失败:", f.cause());
}
});
return future.get();
}
}
2.3 等待 RPC 结果信息维护
RequestFutureManager
类主要作用是用于管理请求和响应。它通过 RESPONSE_FUTURE_MAP
来确保每个请求都能在接收到响应时正确地完成相应的异步操作。
package com.suny.rpc.nettyrpc.core.client;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class RequestFutureManager {
private static final Map<String, CompletableFuture<RpcResponse>> RESPONSE_FUTURE_MAP = new ConcurrentHashMap<>();
public static void addFuture(String sequence, CompletableFuture<RpcResponse> future) {
RESPONSE_FUTURE_MAP.put(sequence, future);
}
public static void removeAndComplete(RpcResponse rpcResponse) {
final String sequence = rpcResponse.getSequence();
final CompletableFuture<RpcResponse> future = RESPONSE_FUTURE_MAP.remove(sequence);
if (future == null) {
log.info("未找到请求 {} 的待处理任务", sequence);
} else {
future.complete(rpcResponse);
}
}
}
3. 服务端处理 RPC 请求
服务端接收到RPC请求后,需要进行以下处理:
- 通过
MessageDecoder
将接收到的字节流解码为RpcRequest
对象。 - 根据
RpcRequest
对象中的类名、方法名和参数类型,使用反射找到对应的 Java 服务实现类和方法。 - 调用该方法,并获取返回结果。
- 将返回结果封装为一个
RpcResponse
对象。 - 通过
MessageEncoder
将RpcResponse
对象编码为字节流。 - 通过 Netty 将编码后的响应发送给客户端。
3.1 完善接收方法处理逻辑
在前面章节中,RpcServerSimpleChannelInboundHandlerImpl
中只是实现了一个空方法。
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务端收到请求 {}.", pmsg);
}
}
}
3.1 定义 RPC 方法管理
定义一个 RpcRequestProcessor
来维护所有的 RPC 方法以及 Bean 实例。核心方法是 process()
, 用于处理本地方法的反射调用:
首先从请求中获取服务类名(
className
),并尝试从BEAN_MAP
获取对应的服务实例。如果找不到对应的服务实例,抛出异常。
使用反射机制获取服务实例的方法(通过请求中提供的方法名和参数类型),并调用该方法。
方法调用结果被返回作为 RPC 请求的响应。同时记录一条调试日志,表示 RPC 服务方法调用成功。
如果在反射调用过程中发生异常(比如方法找不到、方法不可访问、方法执行错误),捕获这些异常并抛出运行时异常。
package com.suny.rpc.nettyrpc.core.process;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class RpcRequestProcessor {
private static final Map<String, Object> BEAN_MAP = new ConcurrentHashMap<>();
public static void addRpcBean(String serviceName, Object bean) {
BEAN_MAP.put(serviceName, bean);
}
public static Object getBean(String serviceName) {
return BEAN_MAP.get(serviceName);
}
public static void remove(String serviceName) {
BEAN_MAP.remove(serviceName);
}
/**
* 处理 rpc 请求
*
* @param rpcRequest rpc 请求参数
* @return rpc请求结果
*/
public static Object process(RpcRequest rpcRequest) {
try {
final String className = rpcRequest.getClassName();
final Object bean = getBean(className);
if (bean == null) {
throw new RuntimeException("未找到 " + className + " 对应实例");
}
final Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterType());
final Object invoke = method.invoke(bean, rpcRequest.getParameters());
log.debug("RPC服务 {}.{} 调用成功", rpcRequest.getClassName(), rpcRequest.getMethodName());
return invoke;
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw new RuntimeException("RPC调用失败", e);
}
}
}
3.2 本地方法调用
package com.suny.rpc.nettyrpc.core.server;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Packet> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
final PacketType packetType = msg.getPacketType();
log.info("服务端收到请求 {} . 请求体: {}", packetType, msg);
if (packetType == PacketType.RPC_REQUEST) {
final RpcRequest rpcRequest = (RpcRequest) msg;
final Object result = RpcRequestProcessor.process(rpcRequest);
final RpcResponse rpcResponse = new RpcResponse(rpcRequest.getSequence(), result);
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.info("服务端处理请求 {}-{} 完毕. 应答结果: {}", rpcRequest.getSequence(), packetType, rpcResponse);
}
}
}
直接调用 RpcRequestProcessor.process()
方法即可,方法会去反射调用本地方法,然后拿到返回值。
4. 服务端返回 RPC 调用结果
服务端在处理完RPC请求后,需要将调用结果返回给客户端:
- 服务端将
RpcResponse
对象通过MessageEncoder
编码为字节流。 - 通过 Netty 将编码后的响应发送给客户端。
完整的代码如下:
package com.suny.rpc.nettyrpc.core.server;
import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Packet> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
final PacketType packetType = msg.getPacketType();
log.info("服务端收到请求 {} . 请求体: {}", packetType, msg);
final RpcRequest rpcRequest = (RpcRequest) msg;
final Object result = RpcRequestProcessor.process(rpcRequest);
final RpcResponse rpcResponse = new RpcResponse(rpcRequest.getSequence(), result);
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.info("服务端处理请求 {}-{} 完毕. 应答结果: {}", rpcRequest.getSequence(), packetType, rpcResponse);
}
}
}
5. 客户端接收 RPC 请求结果
客户端在发送RPC请求后,需要等待并接收服务端返回的调用结果:
- 客户端通过
MessageDecoder
将接收到的字节流解码为RpcResponse
对象。 - 根据
RpcResponse
对象的请求ID,找到对应的CompletableFuture
对象。 - 然后调用
CompletableFuture
的complete()
方法,唤醒客户端的方法调用。
这个逻辑在之前的 NettyRpcRequestSenderImpl
中以及给定了实现:
public class NettyRpcRequestSenderImpl implements RpcRequestSender {
@Autowired
private RpcServiceDiscovery rpcServiceDiscovery;
@SneakyThrows
@Override
public RpcResponse sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
final String className = rpcRequest.getClassName();
final String serviceInstance = rpcServiceDiscovery.getServiceInstance(className);
if (StringUtils.isBlank(serviceInstance)) {
throw new RuntimeException(className + "暂无服务提供者");
}
String[] split = serviceInstance.split(":");
final Channel channel = ChannelManager.get(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
if (channel == null || !channel.isActive()) {
throw new IllegalStateException();
}
RequestFutureManager.addFuture(rpcRequest.getSequence(), future);
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
log.info("客户端消息发送成功! [{}]", rpcRequest);
} else {
f.channel().close();
future.completeExceptionally(f.cause());
log.error("客户端发送消息失败:", f.cause());
}
});
// 阻塞等待并接收服务端返回的调用结果
return future.get();
}
}
6. 总结
本章我们详细探讨了RPC请求的处理以及应答的流程,主要内容包括:
- RPC请求和应答实体的抽象,定义了
RpcRequest
和RpcResponse
两个类,用于在客户端和服务端之间传输方法调用的相关信息。 - 客户端发送RPC请求的流程,包括构建
RpcRequest
对象、编码、发送请求、创建CompletableFuture
对象以及关联请求ID和Future等步骤。 - 服务端处理 RPC 请求的流程,包括解码请求、使用反射找到对应的服务实现类和方法、调用方法获取结果、将结果封装为
RpcResponse
对象、编码并发送响应等步骤。 - 服务端返回 RPC 调用结果的过程,将
RpcResponse
对象编码并通过 Netty 的Channel
发送给客户端。 - 客户端接收 RPC 请求结果的过程,包括解码响应、根据请求 ID 找到对应的
CompletableFuture
对象、将结果或异常设置到 Future 中,以及客户端代码如何获取调用结果等步骤。
通过以上步骤,我们一步步实现了RPC请求的处理和应答流程。客户端发送请求,服务端处理请求并返回结果,客户端接收结果并设置到 Future 中,完成整个RPC调用过程。