七的博客

手写RPC框架系列(五) - RPC服务的自动扫描与依赖注入

RPC手写系列

手写RPC框架系列(五) - RPC服务的自动扫描与依赖注入

在前几个章节中我们已经开始实现 RPC 框架的核心功能,包括服务的注册发现、代理模式的引入等。但是,每次调用服务时手动创建代理对象显然是不现实的,而且服务的实现类也需要自动注册到注册中心。

本章节将介绍如何利用 Spring 的扩展点,实现 RPC 服务的自动扫描与依赖注入。主要内容包括:

  • 回顾 Spring 的 Bean 注册以及依赖注入机制
  • 自定义 RPC 服务注解及扫描注解
  • 基于 Spring 扩展点实现 RPC 服务自动注册
  • 自定义 RPC 依赖注入注解
  • 基于 Spring 扩展点实现 RPC 依赖自动注入
  • 使用示例

1. Spring 项目中的 Bean 注册以及依赖注入

在介绍 RPC 服务的自动注册与注入前,我们先回顾一下 Spring 的 Bean 注册与依赖注入机制。

@Service
public class UserServiceImpl implement  UserService{

    @Autowired
    private UserDao userDao;

    @Override
    public User select(String uid) {
        return userDao.selectByUid(uid);
    }
}

这段代码主要涉及到的 Spring 知识点:

  • @Service 将 UserService 声明为 Spring 中的一个 bean,实例将会被 Spring 管理。
  • @Autowired 注解将注入 UserDao 实例到 userDao变量 中。

使用依赖注入这种方式,可以让我们很容易替换一个接口的实现。假设我们现在场景是,UserDao 对应的实现类是部署在另一台网络上的机器上,当我们调用 userDao.selectByUid(uid) 时,框架可以帮我们透明地去调用对应机器上的服务接口,就像调用本地的函数一样方便。

理解上面的 Spring 的 Bean 的注册以及注入是实现 RPC 服务的注册以及注入的前提!

2. 实现 RPC 服务的自动扫描

所谓的 RPC 服务自动扫描,其实就是模仿 Spring 注册 Bean 的流程,将 Bean 的信息保存到 Zookeeper 中。

  • Spring 将 Bean 的信息保存到 Bean Factory 中。
  • RPC 框架将 Bean 的信息保存到 Zookeeper 中。

我们的策略是在 Spring 容器启动时,扫描我们定义的注解类,并将服务实现类的信息保存到 Zookeeper 中。

2.1 定义 RPC 服务注解

参照 Spring 的注解 @Service ,这里我们自定义一个 @RpcSerivce 注解 ,用于标记需要暴露的服务实现类。

package com.suny.rpc.nettyrpc.core.annotations;

import java.lang.annotation.*;

/**
 * RPC 服务注解,用于代替 spring @service
 *
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcService {

}


  1. @Target(ElementType.TYPE): 表示这个注解只能标注在类型上,即类、接口、枚举等。

  2. @Retention(RetentionPolicy.RUNTIME): 表示这个注解的生命周期是运行时。这样,我们就可以在运行时通过反射获取到这个注解。

2.2 定义 RPC 服务扫描注解

自定义 @RpcServiceScan 注解,用于扫描 RPC 服务,类似于 Spring 的 @ComponentScan:

package com.suny.rpc.nettyrpc.core.annotations;

import com.suny.rpc.nettyrpc.core.spring.RpcBeanRegistrar;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * 参考 spring @ComponentScan
 */
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(RpcBeanRegistrar.class)
@Documented
public @interface RpcServiceScan {
    String[] basePackages() default {};
}

2.3 处理 RPC Bean 注册

接下来就是将实例纳入 Spring 容器中,这里我们需要使用到 ImportBeanDefinitionRegistrar 这个接口,可以先不要深入细节,我们先来过一眼这个接口里面有什么

package org.springframework.context.annotation;

import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.BeanNameGenerator;
import org.springframework.core.type.AnnotationMetadata;


public interface ImportBeanDefinitionRegistrar {

	/**
	 * Register bean definitions as necessary based on the given annotation metadata of
	 * the importing {@code @Configuration} class.
	 */
	default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator) {
		registerBeanDefinitions(importingClassMetadata, registry);
	}

	/**
	 * Register bean definitions as necessary based on the given annotation metadata of
	 * the importing {@code @Configuration} class.
	 */
	default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
	}

}

我们需要自定义一个自己的注册类,然后实现该接口的函数来注册 bean。

package com.suny.rpc.nettyrpc.core.spring;

import com.suny.rpc.nettyrpc.core.annotations.RpcServiceScan;
import com.suny.rpc.nettyrpc.core.annotations.RpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;

import java.lang.annotation.Annotation;

/**
 * 处理 rpc bean 注册
 *
 */
@Slf4j
public class RpcBeanRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {

    /**
     * rpc-core 模块包全路径限定名
     */
    public static final String CORE_MODULE_PACKAGE = "com.suny.rpc.nettyrpc.core";

    private ResourceLoader resourceLoader;

    @Override
    public void setResourceLoader(ResourceLoader resourceLoader) {
        this.resourceLoader = resourceLoader;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // 扫描自定义的 @service 注解
        AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(RpcServiceScan.class.getName()));
        String[] rpcPackages = new String[0];

        if (annotationAttributes != null) {
            rpcPackages = annotationAttributes.getStringArray("basePackages");
        }

        // 保险
        if (rpcPackages.length == 0) {
            // throw new RuntimeException("必须指定 rpc 包扫描路径");
            rpcPackages = new String[]{((StandardAnnotationMetadata) importingClassMetadata).getIntrospectedClass().getPackage().getName()};
        }

        RpcBeanDefinitionScanner serviceScanner = new RpcBeanDefinitionScanner(registry, RpcService.class);

        // 核心模块下的 spring bean 也需要扫描
        serviceScanner.scan(CORE_MODULE_PACKAGE);

        // 自定义扫描注解
        int rpcServiceNum = serviceScanner.scan(rpcPackages);
        log.info("Rpc Service 注册数量 [{}]", rpcServiceNum);

    }


    /**
     * 扫描策略
     */
    static class RpcBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {

        public RpcBeanDefinitionScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> clazz) {
            super(registry);
            super.addIncludeFilter(new AnnotationTypeFilter((clazz)));
        }
    }


}

RpcBeanRegistrar 的主要作用如下:

  1. 自动扫描与注册服务

    • 利用 ClassPathBeanDefinitionScanner 扫描所有带有 @RpcService 注解的类。

    • 这些类自动注册为 Spring 容器中的 Bean,免除了手动定义 Bean 的需求。

  2. 通过注解定制扫描路径

    • 使用 @RpcServiceScan 注解可以自定义扫描的包路径。

    • 如果未指定扫描路径,则默认扫描声明 RpcServiceScan 的类所在的包。

    • 提供了灵活的配置选项,适应不同的项目结构。

  3. 实现关键接口

    • 实现 ImportBeanDefinitionRegistrar 接口允许类在 Spring 的配置阶段注册额外的 Bean,增强了框架的灵活性。
    • 实现 ResourceLoaderAware 接口,允许类访问和使用 Spring 的资源加载器,这对于资源的管理和访问是必要的。

2.4 Spring Boot 启动类中添加扫描注解

同时不要忘记在 SpringBoot 启动类上加 @RpcServiceScan 注解,用来扫描我们自定义的 bean :

import com.suny.rpc.nettyrpc.core.annotations.RpcServiceScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@RpcServiceScan
@SpringBootApplication
public class RpcApplication {


    public static void main(String[] args) {
        SpringApplication.run(RpcApplication.class, args);
    }
}

3. 实现 RPC 服务的依赖注入

接下来需要在服务消费端自动创建服务代理 , 并注入到Spring容器中 , 这样开发者就可以像使用本地 Bean 一样使用远程服务了。

3.1 定义 Reference 注解

为了处理依赖注入,我们需要再参考 @Autowired 定义注解 @Reference,此处的注解为参考 Dubbo 的注解。

package com.suny.rpc.nettyrpc.core.annotations;

import java.lang.annotation.*;

/**
 * 服务引用注解. 用于代替 @Autowired . 参考 dubbo @Reference
 *
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Reference {

}

这个注解只能标注在字段上,用于告诉 RPC 框架,这个字段需要注入一个 RPC 服务代理。

3.2 实现 RPC Bean 的注入

接下来需要在 Spring 启动过程中,将 RPC Bean 注入到被依赖的地方,这里又涉及到另外一个接口 BeanPostProcessor,接口定义如下:

package org.springframework.beans.factory.config;

import org.springframework.beans.BeansException;
import org.springframework.lang.Nullable;

public interface BeanPostProcessor {

	
	@Nullable
	default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}

	@Nullable
	default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}

}


这是 Spring 留的一个拓展接口,可以在 bean 初始化前以及初始化后做一些业务处理。

newBean();   // 实例化 bean
postProcessBeforeInitialization();  // 初始化前业务处理
initBean();  // 初始化 bean
postProcessAfterInitialization(); // 初始化后业务处理

我们自定义一个后置处理器,处理器中总共做了两件事:

  1. 将 RPC bean注册到注册中心中。
  2. 将 RPC bean 注入到变量中。
package com.suny.rpc.nettyrpc.core.spring;

import com.suny.rpc.nettyrpc.core.annotations.Reference;
import com.suny.rpc.nettyrpc.core.annotations.RpcService;
import com.suny.rpc.nettyrpc.core.client.RpcClientProxy;
import com.suny.rpc.nettyrpc.core.network.RpcRequestSender;
import com.suny.rpc.nettyrpc.core.registry.RpcServiceRegistry;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceRegistryParam;
import com.suny.rpc.nettyrpc.core.server.NettyServerProperties;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.net.InetAddress;

/**
 * Rpc bean 处理
 *
 * @author sunjianrong
 * @date 2021/8/22 下午3:20
 */
@Component
@Slf4j
public class RpcBeanProcessor implements BeanPostProcessor {

    private final RpcRequestSender requestSender;
    private final RpcServiceRegistry rpcServiceRegistry;
    private final NettyServerProperties nettyServerProperties;

    public RpcBeanProcessor(RpcServiceRegistry rpcServiceRegistry, RpcRequestSender requestSender, NettyServerProperties nettyServerProperties) {
        this.rpcServiceRegistry = rpcServiceRegistry;
        this.requestSender = requestSender;
        this.nettyServerProperties = nettyServerProperties;
    }


    @SneakyThrows
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 如果当前 bean 有RpcService注解就特殊处理,否则直接跳过
        Service rpcServiceAnnotation = bean.getClass().getAnnotation(RpcService.class);

        if (rpcServiceAnnotation == null) {
            return bean;
        }

        // 以下代码为注册接口到 Rpc 注册中心中,可先忽略
        RpcServiceRegistryParam registryParam = new RpcServiceRegistryParam();
        registryParam.setServiceName(bean.getClass().getInterfaces()[0].getCanonicalName());
        registryParam.setIp(InetAddress.getLocalHost().getHostAddress());
        registryParam.setPort(nettyServerProperties.getServerPort());
        registryParam.setRpcBean(bean);

        rpcServiceRegistry.register(registryParam);
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 处理 rpc reference 注入
        Class<?> beanClass = bean.getClass();
        Field[] declaredFields = beanClass.getDeclaredFields();

        for (Field field : declaredFields) {
            // 如果当前 bean 中属性有 @Reference 注解就特殊处理
            Reference annotation = field.getAnnotation(Reference.class);
            if (annotation == null) {
                continue;
            }

            log.info("【@Reference 注入】{}.{}", beanClass.getName(), field.getName());
            
            // 将代理过后的 bean 注入到字段中
            final RpcClientProxy rpcClientProxy = new RpcClientProxy(requestSender);
            final Object proxy = rpcClientProxy.getProxy(field.getType());
            field.setAccessible(true);
            try {
                field.set(bean, proxy);
            } catch (IllegalAccessException e) {
                throw new RuntimeException("属性" + beanName + field.getName() + "赋值失败");
            }

        }

        return bean;
    }
}

4. 使用示例

最后,我们来看一个完整的使用示例,体验一下RPC服务的自动扫描和依赖注入功能。

4.1 注册服务接口

// 注册成 rpc bean
@RpcService
public class UserService {
    
    public User select(String uid){
        return null;
    }
}

4.2 消费者调用远程服务

@Service
public class RpcConsumer{
 
    // 注入 RPC Bean
    @Reference
    private UserService userService;

    @Override
    public User select(String uid) {
        return userService.selectByUid(uid);
    }
}

5. 总结

在这个章节中主要是介绍了怎么结合 Spring 来实现 RPC Bean 的注册以及注入,可以很方便地像调用本地方法一样调用 RPC 方法。