Redisson2.0源码分析6-集群以及哨兵连接管理
Redisson2.0源码分析6-集群以及哨兵连接管理
集群以及哨兵两种模式是 Redis 里面比较复杂的模式了,需要提前了解一些 Redis 的前置知识点才能更好的看得懂源码。比较典型的有:
- 集群跟哨兵模式作用。
- 集群模式间怎么进行通信。 怎么进行分片。集群节点间怎么进行通信。
- 哨兵模式工作原理。哨兵之间怎么进行通信。怎么选主节点。怎么节点下线。
1. 集群模式
1.1 集群模式节点信息 ClusterNodeInfo
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
public class ClusterNodeInfo {
// 表示节点的不同状态和角色
public enum Flag {NOFLAGS, SLAVE, MASTER, MYSELF, FAIL, HANDSHAKE, NOADDR};
private String nodeId; // 节点的唯一标识符
private URI address; // 节点地址
private List<Flag> flags = new ArrayList<Flag>(); // 节点的标志列表,表示节点的状态和角色
private String slaveOf; // 如果是从节点,表示其主节点的ID
private int startSlot; // 节点负责的起始槽位
private int endSlot; // 节点负责的结束槽位
// 省略 set get
}
1.2 集群模式分区信息 ClusterPartition
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
public class ClusterPartition {
private int startSlot; // 当前集群分区负责的起始槽位
private int endSlot; // 当前集群分区负责的结束槽位
private boolean masterFail; // 主节点是否失效
private URI masterAddress; // 主节点的地址
private List<URI> slaveAddresses = new ArrayList<URI>(); // 从节点的地址列表
// 省略 set get
}
1.3 集群模式连接管理 ClusterConnectionManager
集群模式连接管理也是很复杂,逻辑特别多。主要的逻辑包括:
- 管理 Redis 集群中各个节点的连接,这里面包括主节点以及从节点的连接。
- 定时监控 Redis 集群的状态变化,然后做一些操作。比如主从角色的切换、节点故障处理、分片迁移等等。
- 主从切换这块也是比较复杂的逻辑,检测到主节点故障时会自动切换到新的主节点。
- 管理集群中的 slot 分配,确保槽位正确映射。
monitorClusterChange() 方法是核心逻辑比较多的地方,包括主从角色切换、节点故障处理。
checkSlotsChange() 方法则包含分片迁移的逻辑。
package org.redisson.connection;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.ClusterServersConfig;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClusterNodeInfo.Flag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
// 集群节点的Redis客户端列表
private final List<RedisClient> nodeClients = new ArrayList<RedisClient>();
// 上一次解析的集群分区信息, 键为槽位范围的结束槽位
private final Map<Integer, ClusterPartition> lastPartitions = new HashMap<Integer, ClusterPartition>();
// 监控集群变化的调度任务
private ScheduledFuture<?> monitorFuture;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
init(config);
// 初始化主从配置
this.config = create(cfg);
init(this.config);
// 为每个节点地址创建Redis客户端
for (URI addr : cfg.getNodeAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
try {
// 连接到Redis节点 > 获取集群节点信息
RedisConnection connection = client.connect();
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
// 获取集群节点信息 > 解析集群分区信息
Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
for (ClusterPartition partition : partitions.values()) {
// 为每个分区添加主节点条目
addMasterEntry(partition, cfg);
}
break;
} catch (RedisConnectionException e) {
// 连接失败,记录警告信息
log.warn(e.getMessage(), e);
} finally {
// 异步关闭客户端连接
client.shutdownAsync();
}
}
// 监控集群变化
monitorClusterChange(cfg);
}
@Override
protected void initEntry(MasterSlaveServersConfig config) {
}
// 将一个集群分区的主节点添加到管理器中,并配置相应的主从关系
private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
// 如果主节点失效,记录警告信息并返回
if (partition.isMasterFail()) {
log.warn("master: {} for slot range: {}-{} add failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
return;
}
// 创建连接到主节点的客户端
RedisClient client = createClient(partition.getMasterAddress().getHost(), partition.getMasterAddress().getPort(), cfg.getTimeout());
try {
// 连接到主节点
RedisConnection c = client.connect();
// 发送 CLUSTER INFO 命令获取集群信息
Map<String, String> params = c.sync(RedisCommands.CLUSTER_INFO);
// 如果集群状态为 "fail",记录警告信息并返回
if ("fail".equals(params.get("cluster_state"))) {
log.warn("master: {} for slot range: {}-{} add failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
return;
}
} finally {
// 异步关闭客户端
client.shutdownAsync();
}
// 创建主从配置
MasterSlaveServersConfig config = create(cfg);
log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
// 设置主节点地址
config.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config);
entries.put(partition.getEndSlot(), entry);
// 分区信息放入lastPartitions 中
lastPartitions.put(partition.getEndSlot(), partition);
}
// 监控集群状态变化
private void monitorClusterChange(final ClusterServersConfig cfg) {
// 定时去检查集群状态信息
// 确保每次任务执行完后,才会执行下一次的任务
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
for (URI addr : cfg.getNodeAddresses()) {
final RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
try {
// 连接到节点并获取集群节点信息
// redis 里面每一个节点都可以获取到完整的集群状态
RedisConnection connection = client.connect();
// 获取当前集群节点的状态信息
// CLUSTER NODES 命令返回集群中所有节点的详细信息,包括节点 ID、地址、主从角色、状态等
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
log.debug("cluster nodes state: {}", nodesValue);
// 解析节点信息,更新分区状态
Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
for (ClusterPartition newPart : partitions.values()) {
for (ClusterPartition part : lastPartitions.values()) {
if (newPart.getMasterAddress().equals(part.getMasterAddress())) {
log.debug("found endslot {} for {} fail {}", part.getEndSlot(), part.getMasterAddress(), newPart.isMasterFail());
if (newPart.isMasterFail()) {
// 匹配当前分区的主节点地址
ClusterPartition newMasterPart = partitions.get(part.getEndSlot());
if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) {
log.debug("changing master from {} to {} for {}",
part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot());
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = part.getMasterAddress();
changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort());
slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort());
// 更新主节点地址
part.setMasterAddress(newMasterPart.getMasterAddress());
}
}
break;
}
}
}
// 检查槽位变化
checkSlotsChange(cfg, partitions);
break;
} catch (RedisConnectionException e) {
// 连接失败,跳过当前节点
} finally {
client.shutdownAsync();
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
// 检查槽变动,根据变化更新集群的主从节点配置
private void checkSlotsChange(ClusterServersConfig cfg, Map<Integer, ClusterPartition> partitions) {
Set<Integer> removeSlots = new HashSet<Integer>(lastPartitions.keySet());
removeSlots.removeAll(partitions.keySet());
lastPartitions.keySet().removeAll(removeSlots);
// 记录需要移除的槽位数量
if (!removeSlots.isEmpty()) {
log.info("{} slots found to remove", removeSlots.size());
}
// 移除主节点并异步关闭连接
Map<Integer, MasterSlaveEntry> removeAddrs = new HashMap<Integer, MasterSlaveEntry>();
for (Integer slot : removeSlots) {
MasterSlaveEntry entry = removeMaster(slot);
entry.shutdownMasterAsync();
removeAddrs.put(slot, entry);
}
// 记录需要添加的槽位数量....
Set<Integer> addSlots = new HashSet<Integer>(partitions.keySet());
addSlots.removeAll(lastPartitions.keySet());
if (!addSlots.isEmpty()) {
log.info("{} slots found to add", addSlots.size());
}
// 为新增的槽位添加主节点条目
for (Integer slot : addSlots) {
ClusterPartition partition = partitions.get(slot);
addMasterEntry(partition, cfg);
}
// 将从节点标记为下线
for (Entry<Integer, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
InetSocketAddress url = entry.getValue().getClient().getAddr();
slaveDown(entry.getKey(), url.getHostName(), url.getPort());
}
}
// 解析Redis集群信息字符串
private Map<String, String> parseInfo(String value) {
Map<String, String> result = new HashMap<String, String>();
// 通过换行符分割字符串,然后通过:分割,构建键值对返回。
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
result.put(parts[0], parts[1]);
}
return result;
}
// 解析字符串 Redis 集群节点信息 , 转成对象返回
private Map<Integer, ClusterPartition> parsePartitions(String nodesValue) {
Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>();
// 解析节点信息字符串为 ClusterNodeInfo 集合
List<ClusterNodeInfo> nodes = parse(nodesValue);
for (ClusterNodeInfo clusterNodeInfo : nodes) {
String id = clusterNodeInfo.getNodeId();
// 如果节点是从节点,使用其主节点的ID
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
id = clusterNodeInfo.getSlaveOf();
}
ClusterPartition partition = partitions.get(id);
// 分区不存在创建新的分区
if (partition == null) {
partition = new ClusterPartition();
partitions.put(id, partition);
}
// 节点标记为失败,则设置分区的主节点失效标志
if (clusterNodeInfo.getFlags().contains(Flag.FAIL)) {
partition.setMasterFail(true);
}
// 从节点,添加到分区的从节点地址列表
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
partition.addSlaveAddress(clusterNodeInfo.getAddress());
} else {
// 主节点,设置分区的槽位范围和主节点地址
partition.setStartSlot(clusterNodeInfo.getStartSlot());
partition.setEndSlot(clusterNodeInfo.getEndSlot());
result.put(clusterNodeInfo.getEndSlot(), partition);
partition.setMasterAddress(clusterNodeInfo.getAddress());
}
}
return result;
}
// 集群配置转主从配置
private MasterSlaveServersConfig create(ClusterServersConfig cfg) {
MasterSlaveServersConfig c = new MasterSlaveServersConfig();
c.setRetryInterval(cfg.getRetryInterval());
c.setRetryAttempts(cfg.getRetryAttempts());
c.setTimeout(cfg.getTimeout());
c.setPingTimeout(cfg.getPingTimeout());
c.setLoadBalancer(cfg.getLoadBalancer());
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
return c;
}
// 解析 Redis 集群节点信息字符串,转为 ClusterNodeInfo 对象
private List<ClusterNodeInfo> parse(String nodesResponse) {
List<ClusterNodeInfo> nodes = new ArrayList<ClusterNodeInfo>();
for (String nodeInfo : nodesResponse.split("\n")) {
ClusterNodeInfo node = new ClusterNodeInfo();
String[] params = nodeInfo.split(" ");
String nodeId = params[0];
node.setNodeId(nodeId); // 设置节点ID
String addr = params[1];
node.setAddress(addr); // 设置节点地址
String flags = params[2];
for (String flag : flags.split(",")) {
// 解析并添加节点标志
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue));
}
String slaveOf = params[3];
if (!"-".equals(slaveOf)) {
// 节点是从节点,设置其主节点ID
node.setSlaveOf(slaveOf);
}
if (params.length > 8) {
// 设置节点的槽位范围
String slots = params[8];
String[] parts = slots.split("-");
node.setStartSlot(Integer.valueOf(parts[0]));
node.setEndSlot(Integer.valueOf(parts[1]));
}
nodes.add(node);
}
return nodes;
}
@Override
public void shutdown() {
monitorFuture.cancel(true);
super.shutdown();
for (RedisClient client : nodeClients) {
client.shutdown();
}
}
}
2. 哨兵模式
2.1 哨兵模式连接管理 SentinelConnectionManager
这个类主要用于管理Redis的哨兵模式连接。主要的作用包括:
- 管理多个 Redis 哨兵节点,接收哨兵的通知消息,比如主节点切换、从节点上下线等。
- 自动发现、配置主从节点。
- 主从节点的状态监控、主从节点的自动切换。
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SentinelServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
// 保存哨兵节点
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
// 当前主节点
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
// 已冻结的从节点
private final ConcurrentMap<String, Boolean> freezeSlaves = PlatformDependent.newConcurrentHashMap();
// 从节点
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
init(config);
// 初始化哨兵配置
final MasterSlaveServersConfig c = new MasterSlaveServersConfig();
c.setRetryInterval(cfg.getRetryInterval());
c.setRetryAttempts(cfg.getRetryAttempts());
c.setTimeout(cfg.getTimeout());
c.setPingTimeout(cfg.getPingTimeout());
c.setLoadBalancer(cfg.getLoadBalancer());
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
// 保存断开的节点
List<String> disconnectedSlaves = new ArrayList<String>();
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout());
try {
// 连接到每个哨兵节点
RedisConnection connection = client.connect();
// 获取主节点地址 > 拿到主节点信息 > 设置主节点
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
String masterHost = master.get(0) + ":" + master.get(1);
c.setMasterAddress(masterHost);
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
// 获取从节点地址集合 > 遍历设置从节点
List<Map<String, String>> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : sentinelSlaves) {
String ip = map.get("ip");
String port = map.get("port");
String flags = map.get("flags");
String host = ip + ":" + port;
c.addSlaveAddress(host);
slaves.put(host, true);
log.info("slave: {} added, params: {}", host, map);
if (flags.contains("s_down") || flags.contains("disconnected")) {
disconnectedSlaves.add(host);
}
}
break;
} catch (RedisConnectionException e) {
// 连接失败,跳过当前哨兵节点
} finally {
client.shutdownAsync();
}
}
if (currentMaster.get() == null) {
// 如果无法连接到任何主节点,抛出异常
throw new IllegalStateException("Can't connect to servers!");
}
init(c);
// 标记断开连接的从节点为下线
for (String host : disconnectedSlaves) {
String[] parts = host.split(":");
slaveDown(parts[0], parts[1]);
}
// 注册每个哨兵节点监听事件
for (URI addr : cfg.getSentinelAddresses()) {
registerSentinel(cfg, addr);
}
}
// 注册一个哨兵节点。
// 设置事件监听器,在主从节点状态变化时进行处理
private void registerSentinel(final SentinelServersConfig cfg, final URI addr) {
// 连接到指定的哨兵节点
RedisClient client = createClient(addr.getHost(), addr.getPort());
// 将新的哨兵客户端放入 sentinels 中,存在返回旧的客户端
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
// 已经存在客户端连接,则不再重复注册
if (oldClient != null) {
return;
}
try {
// 建立一个发布/订阅连接
RedisPubSubConnection pubsub = client.connectPubSub();
// 添加一个发布/订阅监听器以处理哨兵事件
pubsub.addListener(new BaseRedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String msg) {
// 新的哨兵节点加入
if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, msg);
}
// 新的从节点加入
if ("+slave".equals(channel)) {
onSlaveAdded(addr, msg);
}
// 节点下线
if ("+sdown".equals(channel)) {
onSlaveDown(addr, msg);
}
// 节点上线
if ("-sdown".equals(channel)) {
onSlaveUp(addr, msg);
}
// 主节点发生变化
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg);
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
// 处理订阅状态的变化
if (type == PubSubType.SUBSCRIBE) {
log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
return true;
}
});
// 订阅与哨兵相关的 channel
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
} catch (RedisConnectionException e) {
log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort());
}
}
// 新的从节点加入处理
protected void onSentinelAdded(SentinelServersConfig cfg, String msg) {
// 解析消息并添加该从节点
String[] parts = msg.split(" ");
if ("sentinel".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String addr = ip + ":" + port;
URI uri = URIBuilder.create(addr);
// 注册哨兵
registerSentinel(cfg, uri);
}
}
// 处理从节点添加
protected void onSlaveAdded(URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
// 提取从节点的IP地址和端口
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
// 检查从节点是否已经存在于 slaves
if (slaves.putIfAbsent(slaveAddr, true) == null) {
addSlave(ip, Integer.valueOf(port));
log.info("slave: {} added", slaveAddr);
}
} else {
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
// 处理从节点下线
private void onSlaveDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if ("slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
// 将从节点标记为下线。
slaveDown(ip, port);
} else if ("sentinel".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String addr = ip + ":" + port;
// 移除该哨兵节点,并异步关闭其连接。
RedisClient sentinel = sentinels.remove(addr);
if (sentinel != null) {
sentinel.shutdownAsync();
log.info("sentinel: {} has down", addr);
}
} else if ("master".equals(parts[0])) {
// 忽略主节点下线事件
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
}
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
}
}
// 处理从节点下线
private void slaveDown(String ip, String port) {
// 将从节点标记为冻结,避免重复处理下线事件
String addr = ip + ":" + port;
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(0, ip, Integer.valueOf(port));
log.info("slave: {} has down", addr);
}
}
// 处理从节点上线的情况
protected void onSlaveUp(URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
if (freezeSlaves.remove(slaveAddr) != null) {
// 将从节点标记为上线
slaveUp(ip, Integer.valueOf(port));
log.info("slave: {} has up", slaveAddr);
}
} else {
log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
// 处理主节点切换
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if (cfg.getMasterName().equals(parts[0])) {
String ip = parts[3];
String port = parts[4];
String current = currentMaster.get();
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
// 执行主节点切换逻辑
changeMaster(0, ip, Integer.valueOf(port));
log.info("master has changed from {} to {}", current, newMaster);
}
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
// 将从节点添加到连接管理中
private void addSlave(String host, int port) {
getEntry(0).addSlave(host, port);
}
// 将从节点标记为上线
private void slaveUp(String host, int port) {
getEntry(0).slaveUp(host, port);
}
@Override
public void shutdown() {
super.shutdown();
for (RedisClient sentinel : sentinels.values()) {
sentinel.shutdown();
}
}
}