七的博客

手写RPC框架系列(四) - 实现服务注册与发现机制

RPC手写系列

手写RPC框架系列(四) - 实现服务注册与发现机制

在上一个章节中,我们讲述了动态代理的基本概念以及基于动态代理的 RPC 服务调用。这一章节中将逐步讲解服务注册中心相关内容。

本章节的大致内容如下:

  • 为什么要用服务注册中心
  • 服务注册中心的作用
  • 服务注册中心的选型
  • 实现服务注册中心要点
  • 结合 Zookeeper 实现注册中心

1. 为什么要使用服务注册中心

上章节中代理模式实现类中的逻辑之所以置空,是因为我们还有一个问题没有解决。 那就是RPC 请求应该发送到哪里去,客户端作为消费方只有明确知道服务提供方的网络地址 ( IP 地址和端口),才能够发起调用。

通常的做法会以下几种:

  • 在程序配置文件中,明确指定服务提供方的 IP 地址以及端口,RPC 框架从配置文件中解析服务信息。
  • 通过一个中间程序保存服务提供方的信息,消费方从这个中间件中拉取服务信息。这种方案也称作注册中心。
方式 优点 缺点
使用配置文件 - 简单且易于理解
- 不依赖额外的组件
- 适用于服务提供方较少、较稳定的场景
- 扩展性差
- 难以实现服务的动态发现
- 不支持服务的负载均衡和failover
使用注册中心 - 高扩展性
- 支持服务的动态发现
- 支持服务的负载均衡和failover
- 有利于系统的监控和运维
- 增加了系统复杂性
- 可能存在单点故障
- 网络开销较大

综上所述, 使用注册中心相比配置文件,在扩展性、动态性、可用性方面有明显的优势,特别是在大型、动态的分布式系统中。但是注册中心也引入了额外的复杂性和运维成本。在实际选择时,需要根据系统的需求等因素进行权衡。

在 RPC 框架项目中我们决定实现的是比较贴合实际项目场景的方案,故我们选择使用注册中心方案。

2. 服务注册中心的作用

服务注册中心是整个服务注册以及发现机制的核心,它的主要职责包括:

  • 提供服务注册接口,让服务提供方能够注册以及注销服务。
  • 提供服务发现接口,让服务消费方能够查询以及获取服务的地址信息。
  • 对注册的服务进行健康检查,以保证服务的可用性。

服务注册中心交互流程

3. 服务注册中心的选型

梳理下常用的注册中心机制以及优缺点,虽然注册中心很多,但是基本的工作原理都跟下面 2 种类似。

3.1 基于数据库

每个服务提供者启动的时候,往数据库指定的数据库表写入自身的一些基础信息,如IP、端口等。当下线时,把数据库表中对应的基础信息删除。服务调用方定时去轮询数据库,比对本地缓存中的服务节点数据来进行更新

优点:

  • 做法比较地简洁,不需要额外引入第三方的依赖。

缺点:

  • 性能上有很大的问题,在处理数据一致性时需要去轮询数据库,给数据库造成很大的压力。
  • 当服务节点由于网络原因下线时,不能及时地更新数据库中节点信息。

3.2 基于第三方中间件

基本原理同数据库做法,服务提供者启动时,可往中间件中写入节点信息。当下线时,可以将对应的信息删除。感知节点变化则会有所差异,如采用 Zookeeper 来实现,可以使用临时节点来存储服务节点信息,当心跳检测超时时,节点就会被自动剔除。当服务提供者下线时,临时节点会被自动删除。 也可以采用 Redis 来则可以使用发布订阅模式来感知进行节点的变化。

优点: - 比较容易做到节点变化及时通知到调用方。 - 不用去轮询,提升资源的利用率。

缺点: - 需要额外的中间件依赖,容易成为单点故障。 - RPC的稳定性受限于中间件的稳定性。

常见的服务注册中心包括 Consul 、Eureka 、Zookeeper 等,这些注册中心在业内生产系统上有大量的实践,本项目我们决定使用 Zookeeper 来实现服务注册、发现等。

4. 实现服务注册中心要点

为了理解服务注册以及发现的基本原理,我们结合 Zookeeper 实现一个简单的注册中心。

4.1. 实现服务注册

服务注册的目的是让服务提供者能够将其提供的服务信息发布到注册中心,以便服务消费者能够发现以及调用这些服务。实现服务注册的逻辑如下:

  1. 服务提供者启动时,需要将节点注册到 Zookeeper 中。

    • 项目启动时扫描自定义注解的接口( 比如 @RocService ) 来识别需要暴露的服务接口。
    • 将接口元数据信息 ( 接口名、版本号、服务提供者地址 ) 封装成一个对象。
    • 在 Zookeeper 中的指定节点 (如 /registry/services ) 中创建一个临时节点中,节点名可以包含服务名以及服务标识。
    • 将接口元数据信息存储在该临时节点的数据中。
    • 服务调用方可以直接去 Zookeeper 中读取服务提供者的信息,并根据需要进行服务调用。
  2. 当服务提供者关闭或者由于服务不可用时,要及时地告知服务调用方。服务的关闭分以下几种情况:

    • 服务提供方主动关闭,这种情况下服务提供方需要将 Zookeeper 节点信息主动删除。
    • 服务提供方意外崩溃或者网络中断导致被动关闭,不能及时删除节点。这种情况下只能靠调用方定时去检测,剔除该节点。如使用 Zookeeper 临时节点的话,当超过一定的时间,节点就会被自动删除。

4.3. 实现服务发现

实现服务发现的目的是让服务消费者能够动态获取到可用的服务提供者列表,并根据一定的策略选择合适的服务提供者进行调用。 实现服务发现的逻辑如下:

  1. 服务消费者在启动时,需要从 Zookeeper 中获取可用的服务提供者列表,并缓存到本地的服务提供者列表中。同时需要在 Zookeeper 中设置一个 Watcher ,监听服务节点的变化事件,以便于及时更新本地的服务提供者列表。

  2. 当服务消费者需要发起 RPC 请求时,可以从本地的服务提供者列表中选择一个合适的服务提供者。当有多个可用的服务节点时,可以进行负载均衡。可参考业界常用的一些负载均衡方案。 如一致性哈希、随机、轮询、加权轮询等。

  3. 当服务节点不可用时,可以及时地将请求转移到其它节点。 一个是需要通过心跳等策略去感知服务节点是否可用,再一个是每次调用存储返回值,如果返回值为超时等情况,需要及时地将服务节点剔除,或者间接性地放行部分请求去尝试节点是否可用。

5. 实现基于 Zookeeper 的服务注册中心

下面我们将开始结合 Zookeeper 实现一个简单版本的注册中心

5.1 封装 Zookeeper 工具类

工具类主要提供下面几个函数:

  • 创建 Zookeeper 节点。
  • 查询指定 RPC 服务下的服务实例地址。
package com.suny.rpc.nettyrpc.core.ext.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

/**
 * Zookeeper 辅助类
 */
@Slf4j
@Component
public class ZookeeperHelper implements DisposableBean {

    public static final String ZOOKEEPER_ADDRESS = "127.0.0.1:2181";
    public static final String BASE_RPC_PATH = "/rpc";
    private static volatile CuratorFramework zookeeperClient;

    private static final Map<String, List<String>> RPC_SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
    public static final Set<String> PATH_SET = new ConcurrentSkipListSet<>();


    /**
     * 创建服务节点
     *
     * @param rpcServiceName 服务类名
     */
    public void createServiceInstanceNode(String rpcServiceName) {
        createServiceInstanceNode(rpcServiceName, null);
    }

    /**
     * 创建服务节点
     *
     * @param rpcServiceName 服务类名
     */
    public void createServiceInstanceNode(String rpcServiceName, String data) {
        checkInit();
        final String serviceNode = BASE_RPC_PATH + "/" + rpcServiceName + "/node";
        try {
            if (data == null) {
                zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(serviceNode);
            } else {
                zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(serviceNode, data.getBytes(StandardCharsets.UTF_8));
            }
        } catch (Exception e) {
            log.info("节点 {} 创建失败", serviceNode, e);
            throw new RuntimeException("创建节点" + serviceNode + "失败", e);
        }
    }


    public List<String> getServiceInstanceNode(String rpcServiceName) {
        checkInit();
        final String serviceNodeName = BASE_RPC_PATH + "/" + rpcServiceName;
        try {
            List<String> serviceNodeList = zookeeperClient.getChildren().forPath(serviceNodeName);
            List<String> res = new ArrayList<>();
            for (String childrenNode : serviceNodeList) {
                try {
                    final byte[] bytes = getZookeeperClient().getData().forPath(serviceNodeName + "/" + childrenNode);
                    res.add(new String(bytes));
                } catch (Exception e) {
                    // throw new RuntimeException("读取节点值" + rpcServiceName + "失败");
                    log.info("读取节点值 {} 失败", rpcServiceName, e);
                }
            }
            return res;
        } catch (Exception e) {
            log.info("获取 {} 子节点列表失败", serviceNodeName, e);
            return Collections.emptyList();
        }
    }

    public Map<String, List<String>> getAllServiceInstanceNode() {
        Map<String, List<String>> res = new HashMap<>();

        try {
            List<String> serviceNodeList = zookeeperClient.getChildren().forPath(BASE_RPC_PATH);
            for (String s : serviceNodeList) {
                res.put(s, getServiceInstanceNode(s));
            }

            return res;
        } catch (Exception e) {
            log.info("获取 {} 子节点列表失败", BASE_RPC_PATH, e);
            return res;
        }
    }


    private void checkInit() {
        if (zookeeperClient != null && zookeeperClient.getState() == CuratorFrameworkState.STARTED) {
            return;
        }

        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        synchronized (CuratorFramework.class) {
            if (zookeeperClient == null) {
                zookeeperClient = CuratorFrameworkFactory.builder().connectString(ZOOKEEPER_ADDRESS).retryPolicy(retry).build();
            }
            zookeeperClient.start();

            try {
                if (!zookeeperClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
                    throw new RuntimeException("Zookeeper 连接超时");
                }
            } catch (InterruptedException e) {
                log.info("响应线程中断");
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * @return zookeeper 操作客户端
     */
    public CuratorFramework getZookeeperClient() {
        checkInit();
        return zookeeperClient;
    }


    private void registerWatcher(String rpcServiceName) throws Exception {
        checkInit();
        String path = BASE_RPC_PATH + "/" + rpcServiceName;
        PathChildrenCache pathChildrenCache = new PathChildrenCache(zookeeperClient, path, true);
        PathChildrenCacheListener listener = (f, e) -> {
            final GetDataBuilder data = f.getData();
            if (data != null) {
                switch (e.getType()) {
                    case CHILD_ADDED:
                        log.debug("添加节点");
                        break;
                    case CHILD_REMOVED:
                        log.debug("删除节点");
                        break;
                    case CHILD_UPDATED:
                        log.debug("节点更新");
                        break;
                    default:
                        log.debug("未处理事件类型 {}", e.getType());
                }
            }
            List<String> list = f.getChildren().forPath(path);
            RPC_SERVICE_ADDRESS_MAP.put(rpcServiceName, list);
        };

        pathChildrenCache.getListenable().addListener(listener);
        pathChildrenCache.start();

    }

    @Override
    public void destroy() throws Exception {
        log.info("Zookeeper开始关闭!");
        zookeeperClient.close();
    }
}

5.2 定义注册中心类型枚举

package com.suny.rpc.nettyrpc.core.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * 注册中心类型
 */
@AllArgsConstructor
@Getter
public enum RegistryCenterType {

    /**
     * Zookeeper
     */
    ZOOKEEPER("Zookeeper");

    private final String name;
}

5.3 注册中心接口抽象

package com.suny.rpc.nettyrpc.core;

import com.suny.rpc.nettyrpc.core.enums.RegistryCenterType;

/**
 * 注册中心接口
 */
public interface IRegistryCenter {

    /**
     * 获取当前注册中心类型
     *
     * @return 当前注册中心类型
     */
    RegistryCenterType getRegistryCenterType();
}

5.4 抽象 RPC 服务注册接口以及实现

5.4.1 定义 RPC 服务注册接口

package com.suny.rpc.nettyrpc.core.registry;

import com.suny.rpc.nettyrpc.core.IRegistryCenter;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceRegistryParam;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceUnRegistryParam;

/**
 * RPC 注册接口
 */
public interface RpcServiceRegistry extends IRegistryCenter {

    /**
     * 注册服务
     * @param registryParam 注册参数
     */
    void register(RpcServiceRegistryParam registryParam);

    /**
     * 反注册参数
     * @param unRegistryParam 反注册参数
     */
    void unRegister(RpcServiceUnRegistryParam unRegistryParam);
}

5.4.2 定义 RPC 服务注册抽象类

提供抽象的 RPC 服务注册实现,主要是将服务的基本信息注册到一个 Map 中,供后续使用。

package com.suny.rpc.nettyrpc.core.registry.impl;

import com.suny.rpc.nettyrpc.core.process.RpcRequestProcessor;
import com.suny.rpc.nettyrpc.core.registry.RpcServiceRegistry;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceRegistryParam;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceUnRegistryParam;
import lombok.extern.slf4j.Slf4j;

/**
 * 抽象 RPC 服务注册类
 */
@Slf4j
public abstract class AbstractRpcServiceRegistry implements RpcServiceRegistry {


    @Override
    public void register(RpcServiceRegistryParam registryParam) {
        doRegister(registryParam);
        // 后续章节将会出现该类
        RpcRequestProcessor.addRpcBean(registryParam.getServiceName(), registryParam.getRpcBean());
        log.info("【{} RPC服务注册】{} >> {}:{}", getRegistryCenterType().getName(), registryParam.getServiceName(), registryParam.getIp(), registryParam.getPort());
    }

    abstract void doRegister(RpcServiceRegistryParam registryParam);

    @Override
    public void unRegister(RpcServiceUnRegistryParam unRegistryParam) {
        doUnRegister(unRegistryParam);
        
        // 后续章节将会出现该类
        RpcRequestProcessor.remove(unRegistryParam.getServiceName());
        log.info("【{} RPC服务反注册】{} >> {}:{}", getRegistryCenterType().getName(), unRegistryParam.getServiceName(), unRegistryParam.getIp(), unRegistryParam.getPort());
    }

    abstract void doUnRegister(RpcServiceUnRegistryParam unRegistryParam);
}

5.4.3 实现 Zookeeper 服务注册

package com.suny.rpc.nettyrpc.core.registry.impl;

import com.suny.rpc.nettyrpc.core.enums.RegistryCenterType;
import com.suny.rpc.nettyrpc.core.ext.zookeeper.ZookeeperHelper;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceRegistryParam;
import com.suny.rpc.nettyrpc.core.registry.param.RpcServiceUnRegistryParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

/**
 * Zookeeper 服务注册实现
 */
@Primary
@Component
@Slf4j
public class ZookeeperRpcServiceRegistryImpl extends AbstractRpcServiceRegistry {

    private final ZookeeperHelper zookeeperHelper;

    public ZookeeperRpcServiceRegistryImpl(ZookeeperHelper zookeeperHelper) {
        this.zookeeperHelper = zookeeperHelper;
    }

    @Override
    public RegistryCenterType getRegistryCenterType() {
        return RegistryCenterType.ZOOKEEPER;
    }

    @Override
    void doRegister(RpcServiceRegistryParam registryParam) {
        String address = registryParam.getIp() + ":" + registryParam.getPort();
        zookeeperHelper.createServiceInstanceNode(registryParam.getServiceName(), address);
    }

    @Override
    void doUnRegister(RpcServiceUnRegistryParam unRegistryParam) {
        // final InetSocketAddress inetSocketAddress = new InetSocketAddress(unRegistryParam.getIp(), unRegistryParam.getPort());
        // zookeeperHelper.removeNode(inetSocketAddress);
    }

}

5.5 抽象 RPC 服务发现以及实现

5.5.1 定义 RPC 服务发现接口

package com.suny.rpc.nettyrpc.core.discovery;

import com.suny.rpc.nettyrpc.core.IRegistryCenter;

import java.util.List;
import java.util.Map;

/**
 * RPC 服务发现接口
 */
public interface RpcServiceDiscovery extends IRegistryCenter {

    /**
     * 拉取指定服务地址集合
     *
     * @param serviceName 服务名称
     * @return 服务地址集合
     */
    List<String> getServiceInstanceList(String serviceName);

    /**
     * 获取一个服务实例
     *
     * @param serviceName 服务名称
     * @return 服务实例
     */
    String getServiceInstance(String serviceName);

    /**
     * 拉取所有服务信息
     * @return 所有服务信息
     */
    @Deprecated
    Map<String, List<String>> getAllServiceInstance();

}

5.5.2 定义 RPC 服务发现抽象类

提供抽象的 RPC 服务发现实现,主要是拉取要调用的服务信息。

package com.suny.rpc.nettyrpc.core.discovery.impl;

import com.suny.rpc.nettyrpc.core.discovery.RpcServiceDiscovery;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * RPC 服务发现抽象类
 */
@Slf4j
public abstract class AbstractRpcServiceDiscovery implements RpcServiceDiscovery {
    @Override
    public List<String> getServiceInstanceList(String serviceName) {
        return doGetServiceInstanceList(serviceName);
    }

    abstract List<String> doGetServiceInstanceList(String serviceName);

    @Override
    public String getServiceInstance(String serviceName) {
        final List<String> list = doGetServiceInstanceList(serviceName);
        if (list == null) {
            return null;
        }

        return list.stream().findFirst().orElse(null);
    }

}

5.5.3 实现 Zookeeper 服务发现

package com.suny.rpc.nettyrpc.core.discovery.impl;

import com.suny.rpc.nettyrpc.core.enums.RegistryCenterType;
import com.suny.rpc.nettyrpc.core.ext.zookeeper.ZookeeperHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * Zookeeper 服务发现实现
 */
@Service
@Slf4j
@Primary
public class ZookeeperRpcServiceDiscoveryImpl extends AbstractRpcServiceDiscovery {

    private final ZookeeperHelper zookeeperHelper;

    public ZookeeperRpcServiceDiscoveryImpl(ZookeeperHelper zookeeperHelper) {
        this.zookeeperHelper = zookeeperHelper;
    }


    @Override
    List<String> doGetServiceInstanceList(String serviceName) {
        List<String> childrenNodes = zookeeperHelper.getServiceInstanceNode(serviceName);
        if (CollectionUtils.isEmpty(childrenNodes)) {
            // throw new RuntimeException("未找到" + serviceName + "服务节点");
            return Collections.emptyList();
        }

        return childrenNodes;
    }


    @Override
    public Map<String, List<String>> getAllServiceInstance() {
        return zookeeperHelper.getAllServiceInstanceNode();
    }


    @Override
    public RegistryCenterType getRegistryCenterType() {
        return RegistryCenterType.ZOOKEEPER;
    }
}

6. 总结

这个章节主要讲解了使用注册中心的原因、注册中心选型、实现简单的注册中心的几个步骤等。目前的代码暂时无法运行,下一个章节我们将继续完善 RPC 框架。