手写RPC框架系列(五) - RPC服务的自动扫描与依赖注入
手写RPC框架系列(五) - RPC服务的自动扫描与依赖注入
在前几个章节中我们已经开始实现 RPC 框架的核心功能,包括服务的注册发现、代理模式的引入等。但是,每次调用服务时手动创建代理对象显然是不现实的,而且服务的实现类也需要自动注册到注册中心。
本章节将介绍如何利用 Spring 的扩展点,实现 RPC 服务的自动扫描与依赖注入。主要内容包括:
- 回顾 Spring 的 Bean 注册以及依赖注入机制
- 自定义 RPC 服务注解及扫描注解
- 基于 Spring 扩展点实现 RPC 服务自动注册
- 自定义 RPC 依赖注入注解
- 基于 Spring 扩展点实现 RPC 依赖自动注入
- 使用示例
1. Spring 项目中的 Bean 注册以及依赖注入
在介绍 RPC 服务的自动注册与注入前,我们先回顾一下 Spring 的 Bean 注册与依赖注入机制。
@Service
public class UserServiceImpl implement UserService{
@Autowired
private UserDao userDao;
@Override
public User select(String uid) {
return userDao.selectByUid(uid);
}
}
这段代码主要涉及到的 Spring 知识点:
@Service
将 UserService 声明为 Spring 中的一个 bean,实例将会被 Spring 管理。@Autowired
注解将注入 UserDao 实例到 userDao变量 中。
使用依赖注入这种方式,可以让我们很容易替换一个接口的实现。假设我们现在场景是,UserDao
对应的实现类是部署在另一台网络上的机器上,当我们调用 userDao.selectByUid(uid)
时,框架可以帮我们透明地去调用对应机器上的服务接口,就像调用本地的函数一样方便。
理解上面的 Spring 的 Bean 的注册以及注入是实现 RPC 服务的注册以及注入的前提!
2. 实现 RPC 服务的自动扫描
所谓的 RPC 服务自动扫描,其实就是模仿 Spring 注册 Bean 的流程,将 Bean 的信息保存到 Zookeeper 中。
- Spring 将 Bean 的信息保存到 Bean Factory 中。
- RPC 框架将 Bean 的信息保存到 Zookeeper 中。
我们的策略是在 Spring 容器启动时,扫描我们定义的注解类,并将服务实现类的信息保存到 Zookeeper 中。
2.1 定义 RPC 服务注解
参照 Spring 的注解 @Service
,这里我们自定义一个 @RpcSerivce
注解 ,用于标记需要暴露的服务实现类。
package com.suny.rpc.nettyrpc.core.annotations;
import java.lang.annotation.*;
/**
* RPC 服务注解,用于代替 spring @service
*
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcService {
}
@Target(ElementType.TYPE): 表示这个注解只能标注在类型上,即类、接口、枚举等。
@Retention(RetentionPolicy.RUNTIME): 表示这个注解的生命周期是运行时。这样,我们就可以在运行时通过反射获取到这个注解。
2.2 定义 RPC 服务扫描注解
自定义 @RpcServiceScan
注解,用于扫描 RPC 服务,类似于 Spring 的 @ComponentScan
:
package com.suny.rpc.nettyrpc.core.annotations;
import com.suny.rpc.nettyrpc.core.spring.RpcBeanRegistrar;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* 参考 spring @ComponentScan
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(RpcBeanRegistrar.class)
@Documented
public @interface RpcServiceScan {
String[] basePackages() default {};
}
2.3 处理 RPC Bean 注册
接下来就是将实例纳入 Spring 容器中,这里我们需要使用到 ImportBeanDefinitionRegistrar
这个接口,可以先不要深入细节,我们先来过一眼这个接口里面有什么
package org.springframework.context.annotation;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.BeanNameGenerator;
import org.springframework.core.type.AnnotationMetadata;
public interface ImportBeanDefinitionRegistrar {
/**
* Register bean definitions as necessary based on the given annotation metadata of
* the importing {@code @Configuration} class.
*/
default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator) {
registerBeanDefinitions(importingClassMetadata, registry);
}
/**
* Register bean definitions as necessary based on the given annotation metadata of
* the importing {@code @Configuration} class.
*/
default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
}
}
我们需要自定义一个自己的注册类,然后实现该接口的函数来注册 bean。
package com.suny.rpc.nettyrpc.core.spring;
import com.suny.rpc.nettyrpc.core.annotations.RpcServiceScan;
import com.suny.rpc.nettyrpc.core.annotations.RpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import java.lang.annotation.Annotation;
/**
* 处理 rpc bean 注册
*
*/
@Slf4j
public class RpcBeanRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {
/**
* rpc-core 模块包全路径限定名
*/
public static final String CORE_MODULE_PACKAGE = "com.suny.rpc.nettyrpc.core";
private ResourceLoader resourceLoader;
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// 扫描自定义的 @service 注解
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(RpcServiceScan.class.getName()));
String[] rpcPackages = new String[0];
if (annotationAttributes != null) {
rpcPackages = annotationAttributes.getStringArray("basePackages");
}
// 保险
if (rpcPackages.length == 0) {
// throw new RuntimeException("必须指定 rpc 包扫描路径");
rpcPackages = new String[]{((StandardAnnotationMetadata) importingClassMetadata).getIntrospectedClass().getPackage().getName()};
}
RpcBeanDefinitionScanner serviceScanner = new RpcBeanDefinitionScanner(registry, RpcService.class);
// 核心模块下的 spring bean 也需要扫描
serviceScanner.scan(CORE_MODULE_PACKAGE);
// 自定义扫描注解
int rpcServiceNum = serviceScanner.scan(rpcPackages);
log.info("Rpc Service 注册数量 [{}]", rpcServiceNum);
}
/**
* 扫描策略
*/
static class RpcBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
public RpcBeanDefinitionScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> clazz) {
super(registry);
super.addIncludeFilter(new AnnotationTypeFilter((clazz)));
}
}
}
RpcBeanRegistrar 的主要作用如下:
自动扫描与注册服务:
利用
ClassPathBeanDefinitionScanner
扫描所有带有@RpcService
注解的类。这些类自动注册为 Spring 容器中的 Bean,免除了手动定义 Bean 的需求。
通过注解定制扫描路径:
使用
@RpcServiceScan
注解可以自定义扫描的包路径。如果未指定扫描路径,则默认扫描声明
RpcServiceScan
的类所在的包。提供了灵活的配置选项,适应不同的项目结构。
实现关键接口:
- 实现
ImportBeanDefinitionRegistrar
接口允许类在 Spring 的配置阶段注册额外的 Bean,增强了框架的灵活性。 - 实现
ResourceLoaderAware
接口,允许类访问和使用 Spring 的资源加载器,这对于资源的管理和访问是必要的。
- 实现
2.4 Spring Boot 启动类中添加扫描注解
同时不要忘记在 SpringBoot 启动类上加 @RpcServiceScan
注解,用来扫描我们自定义的 bean :
import com.suny.rpc.nettyrpc.core.annotations.RpcServiceScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@RpcServiceScan
@SpringBootApplication
public class RpcApplication {
public static void main(String[] args) {
SpringApplication.run(RpcApplication.class, args);
}
}
3. 实现 RPC 服务的依赖注入
接下来需要在服务消费端自动创建服务代理 , 并注入到Spring容器中 , 这样开发者就可以像使用本地 Bean 一样使用远程服务了。
3.1 定义 Reference 注解
为了处理依赖注入,我们需要再参考 @Autowired
定义注解 @Reference
,此处的注解为参考 Dubbo 的注解。
package com.suny.rpc.nettyrpc.core.annotations;
import java.lang.annotation.*;
/**
* 服务引用注解. 用于代替 @Autowired . 参考 dubbo @Reference
*
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Reference {
}
这个注解只能标注在字段上,用于告诉 RPC 框架,这个字段需要注入一个 RPC 服务代理。
3.2 实现 RPC Bean 的注入
接下来需要在 Spring 启动过程中,将 RPC Bean 注入到被依赖的地方,这里又涉及到另外一个接口 BeanPostProcessor
,接口定义如下:
package org.springframework.beans.factory.config;
import org.springframework.beans.BeansException;
import org.springframework.lang.Nullable;
public interface BeanPostProcessor {
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}
这是 Spring 留的一个拓展接口,可以在 bean 初始化前以及初始化后做一些业务处理。
newBean(); // 实例化 bean
postProcessBeforeInitialization(); // 初始化前业务处理
initBean(); // 初始化 bean
postProcessAfterInitialization(); // 初始化后业务处理
我们自定义一个后置处理器,处理器中总共做了两件事:
- 将 RPC bean注册到注册中心中。
- 将 RPC bean 注入到变量中。
package com.suny.rpc.nettyrpc.core.spring;
import com.suny.rpc.nettyrpc.core.annotations.Reference;
import com.suny.rpc.nettyrpc.core.annotations.RpcService;
import com.suny.rpc.nettyrpc.core.client.RpcClientProxy;
import com.suny.rpc.nettyrpc.core.network.RpcRequestSender;
import com.suny.rpc.nettyrpc.core.registry.RpcServiceRegistry;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceRegistryParam;
import com.suny.rpc.nettyrpc.core.server.NettyServerProperties;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.net.InetAddress;
/**
* Rpc bean 处理
*
* @author sunjianrong
* @date 2021/8/22 下午3:20
*/
@Component
@Slf4j
public class RpcBeanProcessor implements BeanPostProcessor {
private final RpcRequestSender requestSender;
private final RpcServiceRegistry rpcServiceRegistry;
private final NettyServerProperties nettyServerProperties;
public RpcBeanProcessor(RpcServiceRegistry rpcServiceRegistry, RpcRequestSender requestSender, NettyServerProperties nettyServerProperties) {
this.rpcServiceRegistry = rpcServiceRegistry;
this.requestSender = requestSender;
this.nettyServerProperties = nettyServerProperties;
}
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
// 如果当前 bean 有RpcService注解就特殊处理,否则直接跳过
Service rpcServiceAnnotation = bean.getClass().getAnnotation(RpcService.class);
if (rpcServiceAnnotation == null) {
return bean;
}
// 以下代码为注册接口到 Rpc 注册中心中,可先忽略
RpcServiceRegistryParam registryParam = new RpcServiceRegistryParam();
registryParam.setServiceName(bean.getClass().getInterfaces()[0].getCanonicalName());
registryParam.setIp(InetAddress.getLocalHost().getHostAddress());
registryParam.setPort(nettyServerProperties.getServerPort());
registryParam.setRpcBean(bean);
rpcServiceRegistry.register(registryParam);
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 处理 rpc reference 注入
Class<?> beanClass = bean.getClass();
Field[] declaredFields = beanClass.getDeclaredFields();
for (Field field : declaredFields) {
// 如果当前 bean 中属性有 @Reference 注解就特殊处理
Reference annotation = field.getAnnotation(Reference.class);
if (annotation == null) {
continue;
}
log.info("【@Reference 注入】{}.{}", beanClass.getName(), field.getName());
// 将代理过后的 bean 注入到字段中
final RpcClientProxy rpcClientProxy = new RpcClientProxy(requestSender);
final Object proxy = rpcClientProxy.getProxy(field.getType());
field.setAccessible(true);
try {
field.set(bean, proxy);
} catch (IllegalAccessException e) {
throw new RuntimeException("属性" + beanName + field.getName() + "赋值失败");
}
}
return bean;
}
}
4. 使用示例
最后,我们来看一个完整的使用示例,体验一下RPC服务的自动扫描和依赖注入功能。
4.1 注册服务接口
// 注册成 rpc bean
@RpcService
public class UserService {
public User select(String uid){
return null;
}
}
4.2 消费者调用远程服务
@Service
public class RpcConsumer{
// 注入 RPC Bean
@Reference
private UserService userService;
@Override
public User select(String uid) {
return userService.selectByUid(uid);
}
}
5. 总结
在这个章节中主要是介绍了怎么结合 Spring 来实现 RPC Bean 的注册以及注入,可以很方便地像调用本地方法一样调用 RPC 方法。