七的博客

Redisson2.0源码分析6-集群以及哨兵连接管理

源码分析

Redisson2.0源码分析6-集群以及哨兵连接管理

集群以及哨兵两种模式是 Redis 里面比较复杂的模式了,需要提前了解一些 Redis 的前置知识点才能更好的看得懂源码。比较典型的有:

  • 集群跟哨兵模式作用。
  • 集群模式间怎么进行通信。 怎么进行分片。集群节点间怎么进行通信。
  • 哨兵模式工作原理。哨兵之间怎么进行通信。怎么选主节点。怎么节点下线。

1. 集群模式

1.1 集群模式节点信息 ClusterNodeInfo

package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.redisson.misc.URIBuilder;

public class ClusterNodeInfo {

    // 表示节点的不同状态和角色
    public enum Flag {NOFLAGS, SLAVE, MASTER, MYSELF, FAIL, HANDSHAKE, NOADDR};

    private String nodeId;  // 节点的唯一标识符
    private URI address;   // 节点地址
    private List<Flag> flags = new ArrayList<Flag>();  //  节点的标志列表,表示节点的状态和角色
    private String slaveOf;  // 如果是从节点,表示其主节点的ID

    private int startSlot;  // 节点负责的起始槽位
    private int endSlot;  // 节点负责的结束槽位

    // 省略 set get

}

1.2 集群模式分区信息 ClusterPartition

package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.redisson.misc.URIBuilder;

public class ClusterPartition {

    private int startSlot; // 当前集群分区负责的起始槽位
    private int endSlot; // 当前集群分区负责的结束槽位
    private boolean masterFail; // 主节点是否失效
    private URI masterAddress; // 主节点的地址
    private List<URI> slaveAddresses = new ArrayList<URI>(); // 从节点的地址列表
  

    // 省略 set get

}

1.3 集群模式连接管理 ClusterConnectionManager

集群模式连接管理也是很复杂,逻辑特别多。主要的逻辑包括:

  • 管理 Redis 集群中各个节点的连接,这里面包括主节点以及从节点的连接。
  • 定时监控 Redis 集群的状态变化,然后做一些操作。比如主从角色的切换、节点故障处理、分片迁移等等。
  • 主从切换这块也是比较复杂的逻辑,检测到主节点故障时会自动切换到新的主节点。
  • 管理集群中的 slot 分配,确保槽位正确映射。

monitorClusterChange() 方法是核心逻辑比较多的地方,包括主从角色切换、节点故障处理。

checkSlotsChange() 方法则包含分片迁移的逻辑。

package org.redisson.connection;

import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.redisson.ClusterServersConfig;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClusterNodeInfo.Flag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterConnectionManager extends MasterSlaveConnectionManager {

    private final Logger log = LoggerFactory.getLogger(getClass());

    // 集群节点的Redis客户端列表
    private final List<RedisClient> nodeClients = new ArrayList<RedisClient>();  
    
    // 上一次解析的集群分区信息, 键为槽位范围的结束槽位
    private final Map<Integer, ClusterPartition> lastPartitions = new HashMap<Integer, ClusterPartition>();
    
    // 监控集群变化的调度任务
    private ScheduledFuture<?> monitorFuture;

    public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
        init(config);
	
        // 初始化主从配置
        this.config = create(cfg);
        init(this.config);

        // 为每个节点地址创建Redis客户端
        for (URI addr : cfg.getNodeAddresses()) {
            RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
            try {
                // 连接到Redis节点  > 获取集群节点信息
                RedisConnection connection = client.connect();
                String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);

                // 获取集群节点信息 > 解析集群分区信息
                Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
                for (ClusterPartition partition : partitions.values()) {
                    // 为每个分区添加主节点条目
                    addMasterEntry(partition, cfg);
                }

                break;

            } catch (RedisConnectionException e) {
                // 连接失败,记录警告信息
                log.warn(e.getMessage(), e);
            } finally {
                // 异步关闭客户端连接
                client.shutdownAsync();
            }
        }
		
        // 监控集群变化
        monitorClusterChange(cfg);
    }

    @Override
    protected void initEntry(MasterSlaveServersConfig config) {
    }

    // 将一个集群分区的主节点添加到管理器中,并配置相应的主从关系
    private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
        // 如果主节点失效,记录警告信息并返回
        if (partition.isMasterFail()) {
            log.warn("master: {} for slot range: {}-{} add failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
            return;
        }

        // 创建连接到主节点的客户端
        RedisClient client = createClient(partition.getMasterAddress().getHost(), partition.getMasterAddress().getPort(), cfg.getTimeout());
        try {
            // 连接到主节点
            RedisConnection c = client.connect();
            // 发送 CLUSTER INFO 命令获取集群信息
            Map<String, String> params = c.sync(RedisCommands.CLUSTER_INFO);

            // 如果集群状态为 "fail",记录警告信息并返回
            if ("fail".equals(params.get("cluster_state"))) {
                log.warn("master: {} for slot range: {}-{} add failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
                return;
            }
        } finally {
             // 异步关闭客户端
            client.shutdownAsync();
        }
		
        // 创建主从配置
        MasterSlaveServersConfig config = create(cfg);
        log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());

        // 设置主节点地址
        config.setMasterAddress(partition.getMasterAddress());

        SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config);
        entries.put(partition.getEndSlot(), entry);
        // 分区信息放入lastPartitions 中
        lastPartitions.put(partition.getEndSlot(), partition);
    }

    // 监控集群状态变化
    private void monitorClusterChange(final ClusterServersConfig cfg) {
        // 定时去检查集群状态信息
        // 确保每次任务执行完后,才会执行下一次的任务
        monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    for (URI addr : cfg.getNodeAddresses()) {
                        final RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
                        try {
	                        // 连接到节点并获取集群节点信息   
                            // redis 里面每一个节点都可以获取到完整的集群状态
                            RedisConnection connection = client.connect();
                            // 获取当前集群节点的状态信息
                            // CLUSTER NODES 命令返回集群中所有节点的详细信息,包括节点 ID、地址、主从角色、状态等
                            String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);

                            log.debug("cluster nodes state: {}", nodesValue);

                            // 解析节点信息,更新分区状态
                            Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
                            for (ClusterPartition newPart : partitions.values()) {
                                for (ClusterPartition part : lastPartitions.values()) {
                                    if (newPart.getMasterAddress().equals(part.getMasterAddress())) {

                                        log.debug("found endslot {} for {} fail {}", part.getEndSlot(), part.getMasterAddress(), newPart.isMasterFail());

                                        if (newPart.isMasterFail()) {
                                            // 匹配当前分区的主节点地址
                                            ClusterPartition newMasterPart = partitions.get(part.getEndSlot());
                                            if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) {
                                                log.debug("changing master from {} to {} for {}",
                                                        part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot());
                                                URI newUri = newMasterPart.getMasterAddress();
                                                URI oldUri = part.getMasterAddress();

                                                changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort());
                                                slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort());
                                                 // 更新主节点地址
                                                part.setMasterAddress(newMasterPart.getMasterAddress());
                                            }
                                        }
                                        break;
                                    }
                                }
                            }

                            // 检查槽位变化
                            checkSlotsChange(cfg, partitions);

                            break;

                        } catch (RedisConnectionException e) {
                            // 连接失败,跳过当前节点
                        } finally {
                            client.shutdownAsync();
                        }
                    }

                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }

            }

        }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
    }

    // 检查槽变动,根据变化更新集群的主从节点配置
    private void checkSlotsChange(ClusterServersConfig cfg, Map<Integer, ClusterPartition> partitions) {
        Set<Integer> removeSlots = new HashSet<Integer>(lastPartitions.keySet());
        removeSlots.removeAll(partitions.keySet());
        lastPartitions.keySet().removeAll(removeSlots);
        // 记录需要移除的槽位数量
        if (!removeSlots.isEmpty()) {
            log.info("{} slots found to remove", removeSlots.size());
        }

        // 移除主节点并异步关闭连接
        Map<Integer, MasterSlaveEntry> removeAddrs = new HashMap<Integer, MasterSlaveEntry>();
        for (Integer slot : removeSlots) {
            MasterSlaveEntry entry = removeMaster(slot);
            entry.shutdownMasterAsync();
            removeAddrs.put(slot, entry);
        }

        // 记录需要添加的槽位数量....
        Set<Integer> addSlots = new HashSet<Integer>(partitions.keySet());
        addSlots.removeAll(lastPartitions.keySet());
        if (!addSlots.isEmpty()) {
            log.info("{} slots found to add", addSlots.size());
        }

        // 为新增的槽位添加主节点条目
        for (Integer slot : addSlots) {
            ClusterPartition partition = partitions.get(slot);
            addMasterEntry(partition, cfg);
        }

         // 将从节点标记为下线
        for (Entry<Integer, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
            InetSocketAddress url = entry.getValue().getClient().getAddr();
            slaveDown(entry.getKey(), url.getHostName(), url.getPort());
        }
    }

    // 解析Redis集群信息字符串
    private Map<String, String> parseInfo(String value) {
        Map<String, String> result = new HashMap<String, String>();
        // 通过换行符分割字符串,然后通过:分割,构建键值对返回。
        for (String entry : value.split("\r\n|\n")) {
            String[] parts = entry.split(":");
            result.put(parts[0], parts[1]);
        }
        return result;
    }

    // 解析字符串 Redis 集群节点信息 , 转成对象返回
    private Map<Integer, ClusterPartition> parsePartitions(String nodesValue) {
        Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
        Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>();

        // 解析节点信息字符串为 ClusterNodeInfo 集合
        List<ClusterNodeInfo> nodes = parse(nodesValue);
        for (ClusterNodeInfo clusterNodeInfo : nodes) {
            String id = clusterNodeInfo.getNodeId();

             // 如果节点是从节点,使用其主节点的ID
            if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
                id = clusterNodeInfo.getSlaveOf();
            }

            ClusterPartition partition = partitions.get(id);
             // 分区不存在创建新的分区
            if (partition == null) {
                partition = new ClusterPartition();
                partitions.put(id, partition);
            }

            // 节点标记为失败,则设置分区的主节点失效标志
            if (clusterNodeInfo.getFlags().contains(Flag.FAIL)) {
                partition.setMasterFail(true);
            }

            // 从节点,添加到分区的从节点地址列表
            if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
                partition.addSlaveAddress(clusterNodeInfo.getAddress());
            } else {
                // 主节点,设置分区的槽位范围和主节点地址
                partition.setStartSlot(clusterNodeInfo.getStartSlot());
                partition.setEndSlot(clusterNodeInfo.getEndSlot());
                result.put(clusterNodeInfo.getEndSlot(), partition);
                partition.setMasterAddress(clusterNodeInfo.getAddress());
            }
        }
        return result;
    }

    // 集群配置转主从配置
    private MasterSlaveServersConfig create(ClusterServersConfig cfg) {
        MasterSlaveServersConfig c = new MasterSlaveServersConfig();
        c.setRetryInterval(cfg.getRetryInterval());
        c.setRetryAttempts(cfg.getRetryAttempts());
        c.setTimeout(cfg.getTimeout());
        c.setPingTimeout(cfg.getPingTimeout());
        c.setLoadBalancer(cfg.getLoadBalancer());
        c.setPassword(cfg.getPassword());
        c.setDatabase(cfg.getDatabase());
        c.setClientName(cfg.getClientName());
        c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
        c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
        c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
        c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
        return c;
    }

    // 解析 Redis 集群节点信息字符串,转为 ClusterNodeInfo 对象
    private List<ClusterNodeInfo> parse(String nodesResponse) {
        List<ClusterNodeInfo> nodes = new ArrayList<ClusterNodeInfo>();
        for (String nodeInfo : nodesResponse.split("\n")) {
            ClusterNodeInfo node = new ClusterNodeInfo();
            String[] params = nodeInfo.split(" ");

            String nodeId = params[0];
            node.setNodeId(nodeId); // 设置节点ID

            String addr = params[1];
            node.setAddress(addr); // 设置节点地址

            String flags = params[2];
            for (String flag : flags.split(",")) {
                // 解析并添加节点标志
                String flagValue = flag.toUpperCase().replaceAll("\\?", "");
                node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue));
            }

            String slaveOf = params[3];
            if (!"-".equals(slaveOf)) {
                // 节点是从节点,设置其主节点ID
                node.setSlaveOf(slaveOf);
            }

            if (params.length > 8) {
                // 设置节点的槽位范围
                String slots = params[8];
                String[] parts = slots.split("-");
                node.setStartSlot(Integer.valueOf(parts[0]));
                node.setEndSlot(Integer.valueOf(parts[1]));
            }

            nodes.add(node);
        }
        return nodes;
    }

    @Override
    public void shutdown() {
        monitorFuture.cancel(true);
        super.shutdown();

        for (RedisClient client : nodeClients) {
            client.shutdown();
        }
    }
}

2. 哨兵模式

2.1 哨兵模式连接管理 SentinelConnectionManager

这个类主要用于管理Redis的哨兵模式连接。主要的作用包括:

  • 管理多个 Redis 哨兵节点,接收哨兵的通知消息,比如主节点切换、从节点上下线等。
  • 自动发现、配置主从节点。
  • 主从节点的状态监控、主从节点的自动切换。
package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SentinelServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.internal.PlatformDependent;

public class SentinelConnectionManager extends MasterSlaveConnectionManager {

    private final Logger log = LoggerFactory.getLogger(getClass());

    // 保存哨兵节点
    private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
    // 当前主节点
    private final AtomicReference<String> currentMaster = new AtomicReference<String>();
    // 已冻结的从节点
    private final ConcurrentMap<String, Boolean> freezeSlaves = PlatformDependent.newConcurrentHashMap();
    // 从节点
    private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();


    public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
        init(config);


        // 初始化哨兵配置
        final MasterSlaveServersConfig c = new MasterSlaveServersConfig();
        c.setRetryInterval(cfg.getRetryInterval());
        c.setRetryAttempts(cfg.getRetryAttempts());
        c.setTimeout(cfg.getTimeout());
        c.setPingTimeout(cfg.getPingTimeout());
        c.setLoadBalancer(cfg.getLoadBalancer());
        c.setPassword(cfg.getPassword());
        c.setDatabase(cfg.getDatabase());
        c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
        c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
        c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
        c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());

        // 保存断开的节点
        List<String> disconnectedSlaves = new ArrayList<String>();
        for (URI addr : cfg.getSentinelAddresses()) {
            RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout());
            try {
                 // 连接到每个哨兵节点
                RedisConnection connection = client.connect();

                // 获取主节点地址  > 拿到主节点信息 > 设置主节点
                List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
                String masterHost = master.get(0) + ":" + master.get(1);
                c.setMasterAddress(masterHost);
                currentMaster.set(masterHost);
                log.info("master: {} added", masterHost);

                // 获取从节点地址集合  > 遍历设置从节点
                List<Map<String, String>> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
                for (Map<String, String> map : sentinelSlaves) {
                    String ip = map.get("ip");
                    String port = map.get("port");
                    String flags = map.get("flags");

                    String host = ip + ":" + port;

                    c.addSlaveAddress(host);
                    slaves.put(host, true);
                    log.info("slave: {} added, params: {}", host, map);

                    if (flags.contains("s_down") || flags.contains("disconnected")) {
                        disconnectedSlaves.add(host);
                    }
                }
                break;
            } catch (RedisConnectionException e) {
                // 连接失败,跳过当前哨兵节点
            } finally {
                client.shutdownAsync();
            }
        }

        if (currentMaster.get() == null) {
            // 如果无法连接到任何主节点,抛出异常
            throw new IllegalStateException("Can't connect to servers!");
        }
        init(c);

        // 标记断开连接的从节点为下线
        for (String host : disconnectedSlaves) {
            String[] parts = host.split(":");
            slaveDown(parts[0], parts[1]);
        }

        // 注册每个哨兵节点监听事件
        for (URI addr : cfg.getSentinelAddresses()) {
            registerSentinel(cfg, addr);
        }
    }

    // 注册一个哨兵节点。
    // 设置事件监听器,在主从节点状态变化时进行处理
    private void registerSentinel(final SentinelServersConfig cfg, final URI addr) {
        // 连接到指定的哨兵节点
        RedisClient client = createClient(addr.getHost(), addr.getPort());
        // 将新的哨兵客户端放入 sentinels 中,存在返回旧的客户端
        RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
        // 已经存在客户端连接,则不再重复注册
        if (oldClient != null) {
            return;
        }

        try {
            // 建立一个发布/订阅连接
            RedisPubSubConnection pubsub = client.connectPubSub();

            // 添加一个发布/订阅监听器以处理哨兵事件
            pubsub.addListener(new BaseRedisPubSubListener<String>() {

                @Override
                public void onMessage(String channel, String msg) {
                    // 新的哨兵节点加入
                    if ("+sentinel".equals(channel)) {
                        onSentinelAdded(cfg, msg);
                    }
                    // 新的从节点加入
                    if ("+slave".equals(channel)) {
                        onSlaveAdded(addr, msg);
                    }
                    // 节点下线
                    if ("+sdown".equals(channel)) {
                        onSlaveDown(addr, msg);
                    }
                    // 节点上线
                    if ("-sdown".equals(channel)) {
                        onSlaveUp(addr, msg);
                    }
                    // 主节点发生变化
                    if ("+switch-master".equals(channel)) {
                        onMasterChange(cfg, addr, msg);
                    }
                }

                @Override
                public boolean onStatus(PubSubType type, String channel) {
                    // 处理订阅状态的变化
                    if (type == PubSubType.SUBSCRIBE) {
                        log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
                    }
                    return true;
                }
            });

            // 订阅与哨兵相关的 channel 
            pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
            log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
        } catch (RedisConnectionException e) {
            log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort());
        }
    }

    // 新的从节点加入处理
    protected void onSentinelAdded(SentinelServersConfig cfg, String msg) {
        // 解析消息并添加该从节点
        String[] parts = msg.split(" ");
        if ("sentinel".equals(parts[0])) {
            String ip = parts[2];
            String port = parts[3];

            String addr = ip + ":" + port;
            URI uri = URIBuilder.create(addr);
            // 注册哨兵
            registerSentinel(cfg, uri);
        }
    }

    // 处理从节点添加
    protected void onSlaveAdded(URI addr, String msg) {
        String[] parts = msg.split(" ");

        if (parts.length > 4
                 && "slave".equals(parts[0])) {
            // 提取从节点的IP地址和端口
            String ip = parts[2];
            String port = parts[3];

            String slaveAddr = ip + ":" + port;

            // 检查从节点是否已经存在于 slaves
            if (slaves.putIfAbsent(slaveAddr, true) == null) {
                addSlave(ip, Integer.valueOf(port));
                log.info("slave: {} added", slaveAddr);
            }
        } else {
            log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
        }
    }

    // 处理从节点下线
    private void onSlaveDown(URI sentinelAddr, String msg) {
        String[] parts = msg.split(" ");

        if (parts.length > 3) {
            if ("slave".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];
                // 将从节点标记为下线。
                slaveDown(ip, port);
            } else if ("sentinel".equals(parts[0])) {
                String ip = parts[2];
                String port = parts[3];

                String addr = ip + ":" + port;
                // 移除该哨兵节点,并异步关闭其连接。
                RedisClient sentinel = sentinels.remove(addr);
                if (sentinel != null) {
                    sentinel.shutdownAsync();
                    log.info("sentinel: {} has down", addr);
                }
            } else if ("master".equals(parts[0])) {
                // 忽略主节点下线事件
            } else {
                log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
            }
        } else {
            log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
        }
    }

    // 处理从节点下线
    private void slaveDown(String ip, String port) {
        // 将从节点标记为冻结,避免重复处理下线事件
        String addr = ip + ":" + port;
        if (freezeSlaves.putIfAbsent(addr, true) == null) {
            slaveDown(0, ip, Integer.valueOf(port));
            log.info("slave: {} has down", addr);
        }
    }

    // 处理从节点上线的情况
    protected void onSlaveUp(URI addr, String msg) {
        String[] parts = msg.split(" ");

        if (parts.length > 4
                 && "slave".equals(parts[0])) {
            String ip = parts[2];
            String port = parts[3];

            String slaveAddr = ip + ":" + port;
            if (freezeSlaves.remove(slaveAddr) != null) {
                // 将从节点标记为上线
                slaveUp(ip, Integer.valueOf(port));
                log.info("slave: {} has up", slaveAddr);
            }
        } else {
            log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
        }
    }

    // 处理主节点切换
    private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
        String[] parts = msg.split(" ");

        if (parts.length > 3) {
            if (cfg.getMasterName().equals(parts[0])) {
                String ip = parts[3];
                String port = parts[4];

                String current = currentMaster.get();
                String newMaster = ip + ":" + port;
                if (!newMaster.equals(current)
                        && currentMaster.compareAndSet(current, newMaster)) {
                    // 执行主节点切换逻辑
                    changeMaster(0, ip, Integer.valueOf(port));
                    log.info("master has changed from {} to {}", current, newMaster);
                }
            }
        } else {
            log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
        }
    }

    // 将从节点添加到连接管理中
    private void addSlave(String host, int port) {
        getEntry(0).addSlave(host, port);
    }

    // 将从节点标记为上线
    private void slaveUp(String host, int port) {
        getEntry(0).slaveUp(host, port);
    }

    @Override
    public void shutdown() {
        super.shutdown();

        for (RedisClient sentinel : sentinels.values()) {
            sentinel.shutdown();
        }
    }
}