七的博客

手写RPC框架系列(九) - RPC请求处理以及应答

RPC手写系列

手写RPC框架系列(九) - RPC请求处理以及应答

在上一章节中,我们详细介绍了如何使用 Netty 实现 RPC 框架的网络通信层,包括服务端和客户端的实现、自定义的通信协议,以及消息的编码和解码。本章将继续探讨 RPC 请求的处理流程以及如何将处理结果返回给调用方。

本章内容大致如下:

  • RPC请求以及应答的实体类抽象。
  • 客户端怎么去发送请求以及维护请求的维护。
  • 服务端怎么去接收请求以及处理本地方法调用。
  • 服务端怎么给客户端应答请求结果。
  • 客户端怎么接收请求结果。

1. RPC 请求以及 RPC 响应实体抽象

在实现具体的 RPC 请求处理以及应答前,我们需要定义表示 RPC 请求以及应答的结构。 这些实体类将在客户端以及服务端之前传输,携带方法调用的相关信息。

1.1 请求参数 RpcRequest

package com.suny.rpc.nettyrpc.core.model;

import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;


@Setter
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
public class RpcRequest extends Packet implements Serializable {

    private static final long serialVersionUID = 6290632141408825905L;

    /**
     * 请求流水号
     */
    private String sequence;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;


    /**
     * 参数类型. 有序
     */
    private Class<?>[] parameterType;

    /**
     * 参数. 有序
     */
    private Object[] parameters;

    @Override
    public PacketType getPacketType() {
        return PacketType.RPC_REQUEST;
    }

}

1.2 响应参数 RpcResponse

package com.suny.rpc.nettyrpc.core.model;

import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;


@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
@Setter
public class RpcResponse extends Packet implements Serializable {

    private static final long serialVersionUID = 2964937493568971128L;

    /**
     * 流水号
     */
    private String sequence;

    /**
     * 异常信息
     */
    private Throwable throwable;

    /**
     * 结果
     */
    private Object result;


    public RpcResponse(String sequence, Throwable throwable) {
        this.sequence = sequence;
        this.throwable = throwable;
    }

    public RpcResponse(String sequence, Object result) {
        this.sequence = sequence;
        this.result = result;
    }

    /**
     * 只提供给反序列化使用
     */
    @Deprecated
    public RpcResponse() {
    }

    @Override
    public PacketType getPacketType() {
        return PacketType.RPC_RESPONSE;
    }
}

2. 客户端发送 RPC 请求

在《手写RPC框架系列(三) - 在RPC框架中实现代理模式》章节中,我们的代理实现如下:

package com.suny.rpc.nettyrpc.core.client;

import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.network.RpcRequestSender;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;

/**
 * 客户端代理
 */
@Slf4j
public class RpcClientProxy implements InvocationHandler {

    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // todo 发送请求给远程服务接口,然后获取应答结果
      
    }
}

invoke() 方法中,我们暂时还没有写具体的逻辑,这里的逻辑也比较简单,就是以下几个流程:

  • 组装 RpcRequest 请求。
  • 通过 Netty 发送给服务端。
  • 接收 Netty 返回的调用结果。
  • 拿到 Netty 返回的调用结果中的数据作为 invoke() 方法的返回值。

2.1 完善客户端代理类逻辑

根据上面的逻辑,我们先初步补充 RpcClientProxy 类代码如下:

public class RpcClientProxy implements InvocationHandler {

    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setSequence(UUID.randomUUID().toString());
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterType(method.getParameterTypes());
        rpcRequest.setParameters(args);

        final RpcResponse rpcResponse = sendRpcRequest(rpcRequest);
        return rpcResponse.getResult();
    }
}

2.2 请求发送处理

这里的 sendRpcRequest() 方法暂时还没有编写,这里面有一定的业务逻辑,需要结合注册中心等进行请求发起,所以我们抽象抽取一个 RpcRequestSender 接口来处理请求发送:

package com.suny.rpc.nettyrpc.core.network;

import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;

public interface RpcRequestSender {

    /**
     * 发送 rpc 请求
     *
     * @param rpcRequest 请求参数
     * @return 请求结果
     */
    RpcResponse sendRpcRequest(RpcRequest rpcRequest);
}

给定一个默认的实现,大致逻辑如下:

  1. 初始化异步结果容器:使用 CompletableFuture<RpcResponse> 来承载异步操作的结果。
  2. 拉取目标服务的调用信息
    • 从 RPC 请求中获取服务类名。
    • 调用 rpcServiceDiscovery.getServiceInstance 以获取服务的实例地址(IP和端口)。
  3. 检查服务实例:如果服务实例为空或格式不正确,则抛出异常表示没有找到服务提供者。
  4. 获取网络通道
    • 根据解析的 IP 地址和端口号,从 ChannelManager 获取对应的 Channel
    • 检查这个 Channel 是否为空或不处于活动状态,如果是,则抛出异常。
  5. 发送 RPC 请求
    • 将 RPC 请求通过 channel.writeAndFlush() 方法发送。
    • 添加监听器,以处理发送成功或失败的事件:
      • 成功:记录日志信息。
      • 失败:关闭通道,标记 future 为异常完成状态,并记录错误日志。
  6. 等待并返回响应
    • 使用 future.get() 阻塞当前线程,直到异步操作完成并返回 RPC 响应结果或发生异常。

代码如下:

package com.suny.rpc.nettyrpc.core.network;

import com.suny.rpc.nettyrpc.core.client.RequestFutureManager;
import com.suny.rpc.nettyrpc.core.discovery.RpcServiceDiscovery;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;


@Service
@Slf4j
public class NettyRpcRequestSenderImpl implements RpcRequestSender {

    @Autowired
    private RpcServiceDiscovery rpcServiceDiscovery;

    @SneakyThrows
    @Override
    public RpcResponse sendRpcRequest(RpcRequest rpcRequest) {
        CompletableFuture<RpcResponse> future = new CompletableFuture<>();

        // 查找地址
        final String className = rpcRequest.getClassName();
        final String serviceInstance = rpcServiceDiscovery.getServiceInstance(className);

        if (StringUtils.isBlank(serviceInstance)) {
            throw new RuntimeException(className + "暂无服务提供者");
        }

        String[] split = serviceInstance.split(":");
        // 查找 channel
        final Channel channel = ChannelManager.get(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
        if (channel == null || !channel.isActive()) {
            throw new IllegalStateException();
        }


        RequestFutureManager.addFuture(rpcRequest.getSequence(), future);

        channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                log.info("客户端消息发送成功! [{}]", rpcRequest);
            } else {
                f.channel().close();
                future.completeExceptionally(f.cause());
                log.error("客户端发送消息失败:", f.cause());
            }
        });


        return future.get();
    }
}

2.3 等待 RPC 结果信息维护

RequestFutureManager 类主要作用是用于管理请求和响应。它通过 RESPONSE_FUTURE_MAP 来确保每个请求都能在接收到响应时正确地完成相应的异步操作。

package com.suny.rpc.nettyrpc.core.client;

import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
public class RequestFutureManager {

    private static final Map<String, CompletableFuture<RpcResponse>> RESPONSE_FUTURE_MAP = new ConcurrentHashMap<>();


    public static void addFuture(String sequence, CompletableFuture<RpcResponse> future) {
        RESPONSE_FUTURE_MAP.put(sequence, future);
    }


    public static void removeAndComplete(RpcResponse rpcResponse) {
        final String sequence = rpcResponse.getSequence();
        final CompletableFuture<RpcResponse> future = RESPONSE_FUTURE_MAP.remove(sequence);
        if (future == null) {
            log.info("未找到请求 {} 的待处理任务", sequence);
        } else {
            future.complete(rpcResponse);
        }
    }


}

3. 服务端处理 RPC 请求

服务端接收到RPC请求后,需要进行以下处理:

  1. 通过 MessageDecoder 将接收到的字节流解码为RpcRequest对象。
  2. 根据 RpcRequest 对象中的类名、方法名和参数类型,使用反射找到对应的 Java 服务实现类和方法。
  3. 调用该方法,并获取返回结果。
  4. 将返回结果封装为一个RpcResponse对象。
  5. 通过MessageEncoderRpcResponse对象编码为字节流。
  6. 通过 Netty 将编码后的响应发送给客户端。

3.1 完善接收方法处理逻辑

在前面章节中,RpcServerSimpleChannelInboundHandlerImpl 中只是实现了一个空方法。

public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Object> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务端收到请求 {}.", pmsg);
        }

    }
}

3.1 定义 RPC 方法管理

定义一个 RpcRequestProcessor 来维护所有的 RPC 方法以及 Bean 实例。核心方法是 process() , 用于处理本地方法的反射调用:

  • 首先从请求中获取服务类名(className),并尝试从 BEAN_MAP 获取对应的服务实例。

  • 如果找不到对应的服务实例,抛出异常。

  • 使用反射机制获取服务实例的方法(通过请求中提供的方法名和参数类型),并调用该方法。

  • 方法调用结果被返回作为 RPC 请求的响应。同时记录一条调试日志,表示 RPC 服务方法调用成功。

  • 如果在反射调用过程中发生异常(比如方法找不到、方法不可访问、方法执行错误),捕获这些异常并抛出运行时异常。

package com.suny.rpc.nettyrpc.core.process;

import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
public class RpcRequestProcessor {

    private static final Map<String, Object> BEAN_MAP = new ConcurrentHashMap<>();

    public static void addRpcBean(String serviceName, Object bean) {
        BEAN_MAP.put(serviceName, bean);
    }

    public static Object getBean(String serviceName) {
        return BEAN_MAP.get(serviceName);
    }

    public static void remove(String serviceName) {
        BEAN_MAP.remove(serviceName);
    }

    /**
     * 处理 rpc 请求
     *
     * @param rpcRequest rpc 请求参数
     * @return rpc请求结果
     */
    public static Object process(RpcRequest rpcRequest) {
        try {
            final String className = rpcRequest.getClassName();
            final Object bean = getBean(className);
            if (bean == null) {
                throw new RuntimeException("未找到 " + className + " 对应实例");
            }

            final Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterType());
            final Object invoke = method.invoke(bean, rpcRequest.getParameters());
            log.debug("RPC服务 {}.{} 调用成功", rpcRequest.getClassName(), rpcRequest.getMethodName());
            return invoke;
        } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
            throw new RuntimeException("RPC调用失败", e);
        }
    }

}

3.2 本地方法调用

package com.suny.rpc.nettyrpc.core.server;

import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Packet> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
        final PacketType packetType = msg.getPacketType();

        log.info("服务端收到请求 {} . 请求体: {}", packetType, msg);
        if (packetType == PacketType.RPC_REQUEST) {
            final RpcRequest rpcRequest = (RpcRequest) msg;

            final Object result = RpcRequestProcessor.process(rpcRequest);

            final RpcResponse rpcResponse = new RpcResponse(rpcRequest.getSequence(), result);

            ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            log.info("服务端处理请求 {}-{} 完毕. 应答结果: {}", rpcRequest.getSequence(), packetType, rpcResponse);
        }

    }
}

直接调用 RpcRequestProcessor.process() 方法即可,方法会去反射调用本地方法,然后拿到返回值。

4. 服务端返回 RPC 调用结果

服务端在处理完RPC请求后,需要将调用结果返回给客户端:

  1. 服务端将 RpcResponse 对象通过 MessageEncoder 编码为字节流。
  2. 通过 Netty 将编码后的响应发送给客户端。

完整的代码如下:

package com.suny.rpc.nettyrpc.core.server;

import com.suny.rpc.nettyrpc.core.enums.PacketType;
import com.suny.rpc.nettyrpc.core.model.RpcRequest;
import com.suny.rpc.nettyrpc.core.model.RpcResponse;
import com.suny.rpc.nettyrpc.core.model.packet.HeartBeatPacket;
import com.suny.rpc.nettyrpc.core.model.packet.Packet;
import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class RpcServerSimpleChannelInboundHandlerImpl extends SimpleChannelInboundHandler<Packet> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
        final PacketType packetType = msg.getPacketType();

        log.info("服务端收到请求 {} . 请求体: {}", packetType, msg);
            final RpcRequest rpcRequest = (RpcRequest) msg;

            final Object result = RpcRequestProcessor.process(rpcRequest);

            final RpcResponse rpcResponse = new RpcResponse(rpcRequest.getSequence(), result);
            ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            log.info("服务端处理请求 {}-{} 完毕. 应答结果: {}", rpcRequest.getSequence(), packetType, rpcResponse);
        }

    }
}

5. 客户端接收 RPC 请求结果

客户端在发送RPC请求后,需要等待并接收服务端返回的调用结果:

  1. 客户端通过 MessageDecoder 将接收到的字节流解码为 RpcResponse 对象。
  2. 根据 RpcResponse 对象的请求ID,找到对应的 CompletableFuture 对象。
  3. 然后调用 CompletableFuturecomplete() 方法,唤醒客户端的方法调用。

这个逻辑在之前的 NettyRpcRequestSenderImpl 中以及给定了实现:

public class NettyRpcRequestSenderImpl implements RpcRequestSender {

    @Autowired
    private RpcServiceDiscovery rpcServiceDiscovery;

    @SneakyThrows
    @Override
    public RpcResponse sendRpcRequest(RpcRequest rpcRequest) {
        CompletableFuture<RpcResponse> future = new CompletableFuture<>();

        
        final String className = rpcRequest.getClassName();
        final String serviceInstance = rpcServiceDiscovery.getServiceInstance(className);

        if (StringUtils.isBlank(serviceInstance)) {
            throw new RuntimeException(className + "暂无服务提供者");
        }

        String[] split = serviceInstance.split(":");
        final Channel channel = ChannelManager.get(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
        if (channel == null || !channel.isActive()) {
            throw new IllegalStateException();
        }


        RequestFutureManager.addFuture(rpcRequest.getSequence(), future);

        channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                log.info("客户端消息发送成功! [{}]", rpcRequest);
            } else {
                f.channel().close();
                future.completeExceptionally(f.cause());
                log.error("客户端发送消息失败:", f.cause());
            }
        });

        // 阻塞等待并接收服务端返回的调用结果
        return future.get();
    }
}

6. 总结

本章我们详细探讨了RPC请求的处理以及应答的流程,主要内容包括:

  1. RPC请求和应答实体的抽象,定义了 RpcRequestRpcResponse 两个类,用于在客户端和服务端之间传输方法调用的相关信息。
  2. 客户端发送RPC请求的流程,包括构建 RpcRequest 对象、编码、发送请求、创建 CompletableFuture 对象以及关联请求ID和Future等步骤。
  3. 服务端处理 RPC 请求的流程,包括解码请求、使用反射找到对应的服务实现类和方法、调用方法获取结果、将结果封装为 RpcResponse 对象、编码并发送响应等步骤。
  4. 服务端返回 RPC 调用结果的过程,将 RpcResponse 对象编码并通过 Netty 的 Channel 发送给客户端。
  5. 客户端接收 RPC 请求结果的过程,包括解码响应、根据请求 ID 找到对应的 CompletableFuture 对象、将结果或异常设置到 Future 中,以及客户端代码如何获取调用结果等步骤。

通过以上步骤,我们一步步实现了RPC请求的处理和应答流程。客户端发送请求,服务端处理请求并返回结果,客户端接收结果并设置到 Future 中,完成整个RPC调用过程。